Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 7 additions & 26 deletions src/ldlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
"""

import sys
from datetime import datetime, timezone
from typing import TYPE_CHECKING, NoReturn, cast

import duckdb
Expand All @@ -53,7 +52,7 @@
autocommit,
sqlid,
)
from .database import Database, LoadHistory
from .database import Database

if TYPE_CHECKING:
from collections.abc import Iterator
Expand Down Expand Up @@ -308,7 +307,12 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
if self.db is None or self._database is None:
self._check_db()
return []
start = datetime.now(timezone.utc)

self._database.prepare_history(
table,
path,
query if query and isinstance(query, str) else None,
)
if not self._quiet:
print("ldlite: querying: " + path, file=sys.stderr)

Expand All @@ -328,7 +332,6 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
file=sys.stderr,
)

download_started = datetime.now(timezone.utc)
processed = self._database.ingest_records(
table,
cast(
Expand All @@ -346,10 +349,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
),
),
)
download = datetime.now(timezone.utc)
download_elapsed = datetime.now(timezone.utc) - download_started

transform_started = datetime.now(timezone.utc)
if not use_legacy_transform:
no_iters_format = (
"{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]"
Expand Down Expand Up @@ -377,15 +377,13 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
)
if keep_raw:
newtables = [table, *newtables]
transform_elapsed = datetime.now(timezone.utc) - transform_started

with tqdm(
desc="indexing",
leave=False,
disable=self._quiet,
bar_format=no_iters_format,
) as progress:
index_started = datetime.now(timezone.utc)
self._database.index_prefix(table, progress)

else:
Expand Down Expand Up @@ -424,10 +422,7 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
finally:
autocommit(self.db, self.dbtype, True)

transform_elapsed = datetime.now(timezone.utc) - transform_started

# Create indexes on id columns (for postgres)
index_started = datetime.now(timezone.utc)
if self.dbtype == DBType.POSTGRES:

class PbarNoop:
Expand Down Expand Up @@ -464,20 +459,6 @@ def close(self) -> None: ...
pbar.update(1)
pbar.close()

index_elapsed = datetime.now(timezone.utc) - index_started
self._database.record_history(
LoadHistory(
table,
path,
query if query and isinstance(query, str) else None,
processed,
download,
start,
download_elapsed,
transform_elapsed,
index_elapsed,
),
)
# Return table names
if not self._quiet:
print("ldlite: created tables: " + ", ".join(newtables), file=sys.stderr)
Expand Down
21 changes: 2 additions & 19 deletions src/ldlite/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,14 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING, NoReturn

if TYPE_CHECKING:
import datetime
from collections.abc import Iterator

from tqdm import tqdm


@dataclass(frozen=True)
class LoadHistory:
"""Represents the statistics and history of a single ldlite operation."""

table_name: str
path: str
query: str | None
total: int
download_time: datetime.datetime
start_time: datetime.datetime
download_interval: datetime.timedelta
transform_interval: datetime.timedelta
index_interval: datetime.timedelta


class Database(ABC):
"""The required interface for LDLite to utilite a database."""

Expand Down Expand Up @@ -69,5 +52,5 @@ def index_prefix(self, prefix: str, progress: tqdm[NoReturn] | None = None) -> N
"""Finds and indexes all tables at the given prefix."""

@abstractmethod
def record_history(self, history: LoadHistory) -> None:
"""Records the statistics and history of a single ldlite operation."""
def prepare_history(self, prefix: str, path: str, query: str | None) -> None:
"""Creates an entry with the current parameters in the history table."""
7 changes: 6 additions & 1 deletion src/ldlite/database/_duckdb.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections.abc import Iterator
from datetime import datetime, timezone
from itertools import count
from typing import TYPE_CHECKING, Any, cast

Expand Down Expand Up @@ -89,6 +90,7 @@ def ingest_records(
records: Iterator[bytes],
) -> int:
pfx = Prefix(prefix)
download_started = datetime.now(timezone.utc)
pkey = count(1)
with self._conn_factory() as conn:
self._prepare_raw_table(conn, pfx)
Expand All @@ -102,9 +104,12 @@ def ingest_records(
with conn.begin() as tx, tx.cursor() as cur:
for r in records:
cur.execute(insert_sql, (next(pkey), r.decode()))

total = next(pkey) - 1
self._download_complete(conn, pfx, total, download_started)
tx.commit()

return next(pkey) - 1
return total

def source_table_cte_stmt(self, keep_source: bool) -> str: # noqa: ARG002
return "WITH ld_source AS (SELECT * FROM {source_table})"
Expand Down
1 change: 0 additions & 1 deletion src/ldlite/database/_expansion/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ def unnest(
SELECT
{cols}
FROM ld_source
WHERE NOT ldlite_system.jis_null({json_col})
""",
)
.format(
Expand Down
5 changes: 5 additions & 0 deletions src/ldlite/database/_postgres.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from collections.abc import Iterator
from datetime import datetime, timezone
from itertools import count

import psycopg
Expand Down Expand Up @@ -120,6 +121,7 @@ def ingest_records(
records: Iterator[bytes],
) -> int:
pfx = Prefix(prefix)
download_started = datetime.now(timezone.utc)
pkey = count(1)
with self._conn_factory() as conn:
self._prepare_raw_table(conn, pfx)
Expand All @@ -141,7 +143,10 @@ def ingest_records(
rb.extend(r)
copy.write_row((next(pkey).to_bytes(4, "big"), rb))

total = next(pkey) - 1
self._download_complete(conn, pfx, total, download_started)
conn.commit()

return next(pkey) - 1

def preprocess_source_table(
Expand Down
130 changes: 107 additions & 23 deletions src/ldlite/database/_typed_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@

from abc import abstractmethod
from contextlib import closing
from datetime import timezone
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Generic, NoReturn, TypeVar, cast
from uuid import uuid4

import psycopg
from psycopg import sql
from tqdm import tqdm

from . import Database, LoadHistory
from . import Database
from ._expansion import expand_nonmarc
from ._expansion.context import ExpandContext
from ._prefix import Prefix
Expand All @@ -37,9 +37,14 @@ def __init__(self, conn_factory: Callable[[], DB]):
"table_name" TEXT UNIQUE
,"path" TEXT
,"query" TEXT
,"row_count" INTEGER
,"download_complete_utc" TIMESTAMP
,"start_utc" TIMESTAMP

,"rowcount" INTEGER
,"download_complete" TIMESTAMPTZ

,"final_rowcount" INTEGER
,"transform_complete" TIMESTAMPTZ
,"data_refreshed" TIMESTAMPTZ

,"download_time" INTERVAL
,"transform_time" INTERVAL
,"index_time" INTERVAL
Expand Down Expand Up @@ -195,6 +200,7 @@ def expand_prefix(
transform_progress: tqdm[NoReturn] | None = None,
) -> list[str]:
pfx = Prefix(prefix)
transform_started = datetime.now(timezone.utc)
with closing(self._conn_factory()) as conn:
self._drop_extracted_tables(conn, pfx)
if json_depth < 1:
Expand Down Expand Up @@ -262,12 +268,22 @@ def expand_prefix(
[(pfx.catalog_table_row(t),) for t in created_tables],
)

with conn.cursor() as cur:
cur.execute(
sql.SQL("SELECT COUNT(*) FROM {table}")
.format(table=pfx.output_table("").id)
.as_string(),
)
total = cast("tuple[int]", cur.fetchone())[0]

self._transform_complete(conn, pfx, total, transform_started)
conn.commit()

return created_tables

def index_prefix(self, prefix: str, progress: tqdm[NoReturn] | None = None) -> None:
pfx = Prefix(prefix)
index_started = datetime.now(timezone.utc)
with closing(self._conn_factory()) as conn:
with closing(conn.cursor()) as cur:
cur.execute(
Expand Down Expand Up @@ -321,33 +337,101 @@ def index_prefix(self, prefix: str, progress: tqdm[NoReturn] | None = None) -> N
if progress is not None:
progress.update(1)

self._index_complete(conn, pfx, index_started)
conn.commit()

def record_history(self, history: LoadHistory) -> None:
with closing(self._conn_factory()) as conn, conn.cursor() as cur:
def prepare_history(
self,
prefix: str,
path: str,
query: str | None,
) -> None:
with closing(self._conn_factory()) as conn, closing(conn.cursor()) as cur:
cur.execute(
"""
INSERT INTO "ldlite_system"."load_history_v1" VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9)
INSERT INTO "ldlite_system"."load_history_v1"
(
"table_name"
,"path"
,"query"
)
VALUES($1,$2,$3)
ON CONFLICT ("table_name") DO UPDATE SET
"path" = EXCLUDED."path"
,"query" = EXCLUDED."query"
,"row_count" = EXCLUDED."row_count"
,"download_complete_utc" = EXCLUDED."download_complete_utc"
,"start_utc" = EXCLUDED."start_utc"
,"download_time" = EXCLUDED."download_time"
,"transform_time" = EXCLUDED."transform_time"
,"index_time" = EXCLUDED."index_time"
""",
(
Prefix(history.table_name).load_history_key,
history.path,
history.query,
history.total,
history.download_time.astimezone(timezone.utc),
history.start_time.astimezone(timezone.utc),
history.download_interval,
history.transform_interval,
history.index_interval,
Prefix(prefix).load_history_key,
path,
query,
),
)
conn.commit()

def _download_complete(
self,
conn: DB,
pfx: Prefix,
rowcount: int,
download_start: datetime,
) -> None:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE "ldlite_system"."load_history_v1" SET
"rowcount" = $2
,"download_complete" = $3
,"download_time" = $4
WHERE "table_name" = $1;
""",
(
pfx.load_history_key,
rowcount,
datetime.now(timezone.utc),
datetime.now(timezone.utc) - download_start,
),
)

def _transform_complete(
self,
conn: DB,
pfx: Prefix,
final_rowcount: int,
transform_start: datetime,
) -> None:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE "ldlite_system"."load_history_v1" SET
"final_rowcount" = $2
,"transform_complete" = $3
,"data_refreshed" = "download_complete"
,"transform_time" = $4
WHERE "table_name" = $1
""",
(
pfx.load_history_key,
final_rowcount,
datetime.now(timezone.utc),
datetime.now(timezone.utc) - transform_start,
),
)

def _index_complete(
self,
conn: DB,
pfx: Prefix,
index_start: datetime,
) -> None:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE "ldlite_system"."load_history_v1" SET
"index_time" = $2
WHERE "table_name" = $1
""",
(
pfx.load_history_key,
datetime.now(timezone.utc) - index_start,
),
)
Loading
Loading