Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/linkml_map/datamodel/transformer_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,8 @@ class ClassDerivation(ElementDerivation):
'EnumDerivation',
'PermissibleValueDerivation']} })
joins: Optional[Dict[str, AliasedClass]] = Field(default_factory=dict, description="""Additional classes to be joined to derive instances of the target class""", json_schema_extra = { "linkml_meta": {'alias': 'joins',
'comments': ['not yet implemented'],
'comments': ['supports cross-table lookups via source_key/lookup_key or the '
'join_on field'],
'domain_of': ['ClassDerivation']} })
slot_derivations: Optional[Dict[str, SlotDerivation]] = Field(default_factory=dict, json_schema_extra = { "linkml_meta": {'alias': 'slot_derivations',
'domain_of': ['TransformationSpecification', 'ClassDerivation']} })
Expand Down Expand Up @@ -341,6 +342,9 @@ class AliasedClass(ConfiguredBaseModel):

alias: str = Field(default=..., description="""name of the class to be aliased""", json_schema_extra = { "linkml_meta": {'alias': 'alias', 'domain_of': ['AliasedClass']} })
class_named: Optional[str] = Field(default=None, description="""local alias for the class""", json_schema_extra = { "linkml_meta": {'alias': 'class_named', 'domain_of': ['AliasedClass']} })
source_key: Optional[str] = Field(default=None, description="""column in the primary (populated_from) table used as the join key""", json_schema_extra = { "linkml_meta": {'alias': 'source_key', 'domain_of': ['AliasedClass']} })
lookup_key: Optional[str] = Field(default=None, description="""column in the secondary (joined) table used as the join key""", json_schema_extra = { "linkml_meta": {'alias': 'lookup_key', 'domain_of': ['AliasedClass']} })
join_on: Optional[str] = Field(default=None, description="""shorthand for source_key and lookup_key when both share the same column name""", json_schema_extra = { "linkml_meta": {'alias': 'join_on', 'domain_of': ['AliasedClass']} })


class SlotDerivation(ElementDerivation):
Expand Down
8 changes: 7 additions & 1 deletion src/linkml_map/datamodel/transformer_model.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ classes:
inlined: true
description: Additional classes to be joined to derive instances of the target class
comments:
- not yet implemented
- supports cross-table lookups via source_key/lookup_key or the join_on field
slot_derivations:
range: SlotDerivation
multivalued: true
Expand Down Expand Up @@ -220,6 +220,12 @@ classes:
description: name of the class to be aliased
class_named:
description: local alias for the class
source_key:
description: column in the primary (populated_from) table used as the join key
lookup_key:
description: column in the secondary (joined) table used as the join key
join_on:
description: shorthand for source_key and lookup_key when both share the same column name

SlotDerivation:
is_a: ElementDerivation
Expand Down
14 changes: 14 additions & 0 deletions src/linkml_map/loaders/data_loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,20 @@ def _find_file(self, identifier: str) -> Optional[Path]:

return None

def get_path(self, identifier: str) -> Path:
"""
Return the resolved file path for *identifier*.

:param identifier: Logical table/file name (without extension).
:returns: Absolute path to the matching data file.
:raises FileNotFoundError: If no matching file is found.
"""
path = self._find_file(identifier)
if path is None:
msg = f"No data file found for identifier {identifier!r} under {self.base_path}"
raise FileNotFoundError(msg)
return path.resolve()

def __contains__(self, identifier: str) -> bool:
"""Check if a data file exists for the given identifier."""
if self.is_single_file:
Expand Down
81 changes: 81 additions & 0 deletions src/linkml_map/transformer/engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Spec-driven processing engine with cross-table lookup support."""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any

from linkml_map.utils.lookup_index import LookupIndex

if TYPE_CHECKING:
from collections.abc import Iterator

from linkml_map.loaders.data_loaders import DataLoader
from linkml_map.transformer.object_transformer import ObjectTransformer

logger = logging.getLogger(__name__)


def transform_spec(
transformer: ObjectTransformer,
data_loader: DataLoader,
source_type: str | None = None,
) -> Iterator[dict[str, Any]]:
"""
Iterate class_derivation blocks and stream transformed rows.

For each block whose ``populated_from`` names a loadable table, this
function:

1. Registers any ``joins`` as secondary tables in a :class:`LookupIndex`.
2. Streams primary-table rows through
:meth:`ObjectTransformer.map_object`.
3. Drops secondary tables when the block is done.

:param transformer: A configured :class:`ObjectTransformer`.
:param data_loader: Loader that can resolve table names to file paths.
:param source_type: Optional explicit source type override.
:returns: Iterator of transformed row dicts.
"""
spec = transformer.derived_specification
if spec is None:
return

if transformer.lookup_index is None:
transformer.lookup_index = LookupIndex()

for class_deriv in spec.class_derivations:
table_name = class_deriv.populated_from or class_deriv.name
if table_name not in data_loader:
logger.debug("Skipping class_derivation %s: no data found", class_deriv.name)
continue

joined_tables: list[str] = []
try:
# Register secondary (joined) tables
if class_deriv.joins:
for join_name, join_spec in class_deriv.joins.items():
lookup_key = join_spec.lookup_key or join_spec.join_on
source_key = join_spec.source_key or join_spec.join_on
if not lookup_key or not source_key:
msg = (
f"Join {join_name!r} must specify 'join_on' or both "
f"'source_key' and 'lookup_key'"
)
raise ValueError(msg)
join_path = data_loader.get_path(join_name)
transformer.lookup_index.register_table(
join_name, join_path, lookup_key
)
joined_tables.append(join_name)

# Stream primary table rows
for row in data_loader[table_name]:
yield transformer.map_object(
row,
source_type=source_type or table_name,
class_derivation=class_deriv,
)
finally:
for jt in joined_tables:
transformer.lookup_index.drop(jt)
29 changes: 28 additions & 1 deletion src/linkml_map/transformer/object_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pydantic import BaseModel

from linkml_map.datamodel.transformer_model import (
AliasedClass,
ClassDerivation,
CollectionType,
PivotDirectionType,
Expand Down Expand Up @@ -46,13 +47,15 @@ def __init__( # noqa: PLR0913
source_type: str,
sv: SchemaView,
bindings: dict,
join_specs: Optional[dict[str, AliasedClass]] = None,
) -> None:
self.object_transformer: ObjectTransformer = object_transformer
self.source_obj: OBJECT_TYPE = source_obj
self.source_obj_typed: OBJECT_TYPE = source_obj_typed
self.source_type: str = source_type
self.sv: SchemaView = sv
self.bindings: dict = {}
self.join_specs: dict[str, AliasedClass] = join_specs or {}
if bindings:
self.bindings.update(bindings)

Expand Down Expand Up @@ -105,10 +108,32 @@ def __iter__(self) -> Iterator:

def __getitem__(self, name: Any) -> Any:
if name not in self.bindings:
_ = self.get_ctxt_obj_and_dict({name: self.source_obj[name]})
if name in self.join_specs:
if self.object_transformer.lookup_index is None:
msg = f"Join configured for {name!r} but lookup_index has not been initialized"
raise ValueError(msg)
self.bindings[name] = self._resolve_join(name)
else:
_ = self.get_ctxt_obj_and_dict({name: self.source_obj[name]})

return self.bindings.get(name)

def _resolve_join(self, table_name: str) -> Optional[DynObj]:
"""Resolve a cross-table lookup, returning a DynObj or None."""
spec = self.join_specs[table_name]
source_key = spec.source_key or spec.join_on
lookup_key = spec.lookup_key or spec.join_on
if not source_key or not lookup_key:
msg = f"Join spec for {table_name!r} must specify 'join_on' or both 'source_key' and 'lookup_key'"
raise ValueError(msg)
key_val = self.source_obj.get(source_key)
if key_val is None:
return None
row = self.object_transformer.lookup_index.lookup_row(table_name, lookup_key, key_val)
if row is None:
return None
return DynObj(**row)

def __setitem__(self, name: Any, value: Any) -> None:
del name, value
msg = f"__setitem__ not allowed on class {self.__class__.__name__}"
Expand All @@ -124,6 +149,7 @@ class ObjectTransformer(Transformer):
"""

object_index: ObjectIndex = None
lookup_index: Any = None # Optional[LookupIndex] — lazy import to avoid hard duckdb dep

def index(self, source_obj: Any, target: Optional[str] = None) -> None:
"""
Expand Down Expand Up @@ -264,6 +290,7 @@ def map_object(
source_type=source_type,
sv=sv,
bindings={"NULL": None},
join_specs=class_deriv.joins if class_deriv.joins else None,
)

try:
Expand Down
5 changes: 3 additions & 2 deletions src/linkml_map/utils/eval_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,13 @@ def _eval_set(self, node: ast.Set) -> Any: # noqa: ANN401
msg = "The {} must enclose a single variable"
raise ValueError(msg)
e = node.elts[0]
if not isinstance(e, ast.Name):
if not isinstance(e, (ast.Name, ast.Attribute)):
msg = "The {} must enclose a variable"
raise TypeError(msg)
v = self._eval(e)
if v is None:
msg = f"{e.id} is not set"
label = ast.dump(e) if isinstance(e, ast.Attribute) else e.id
msg = f"{label} is not set"
raise UnsetValueError(msg)
return v

Expand Down
90 changes: 90 additions & 0 deletions src/linkml_map/utils/lookup_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""DuckDB-backed cross-table lookup index for join resolution."""

from __future__ import annotations

import re
from pathlib import Path
from typing import Any

import duckdb

_IDENTIFIER_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")


def _validate_identifier(name: str) -> None:
"""Validate that *name* is a safe SQL identifier."""
if not _IDENTIFIER_RE.match(name):
msg = f"Invalid identifier: {name!r}"
raise ValueError(msg)


class LookupIndex:
"""
In-memory DuckDB index for cross-table lookups.

Each registered table is loaded from a CSV/TSV file via ``read_csv_auto``
and indexed on a key column for fast single-row lookups.
"""

def __init__(self) -> None:
"""Initialize an empty lookup index with an in-memory DuckDB connection."""
self._conn = duckdb.connect(":memory:")
self._tables: dict[str, str] = {} # table_name -> key_column

def register_table(self, name: str, file_path: Path | str, key_column: str) -> None:
"""
Load a CSV/TSV file into DuckDB and create an index on *key_column*.

:param name: Logical table name (must be a valid identifier).
:param file_path: Path to a CSV or TSV file.
:param key_column: Column to index for lookups.
"""
_validate_identifier(name)
_validate_identifier(key_column)
file_path = Path(file_path)
self._conn.execute(
f"CREATE OR REPLACE TABLE {name} AS " # noqa: S608
"SELECT * FROM read_csv_auto(?, all_varchar=true)",
[str(file_path)]
)
self._conn.execute(
f"CREATE INDEX IF NOT EXISTS idx_{name}_{key_column} ON {name} ({key_column})" # noqa: S608
)
self._tables[name] = key_column

def lookup_row(
self, table: str, key_col: str, key_val: Any # noqa: ANN401
) -> dict[str, Any] | None:
"""
Return the first row matching *key_val* on *key_col*, or ``None``.

:param table: Previously registered table name.
:param key_col: Column to match on.
:param key_val: Value to look up.
:returns: Row as a dict, or None if not found.
"""
_validate_identifier(table)
_validate_identifier(key_col)
result = self._conn.execute(
f"SELECT * FROM {table} WHERE {key_col} = $1 LIMIT 1", # noqa: S608
[str(key_val)],
).fetchone()
if result is None:
return None
columns = [desc[0] for desc in self._conn.description]
return dict(zip(columns, result))

def drop(self, table: str) -> None:
"""Drop a registered table, releasing memory."""
_validate_identifier(table)
self._conn.execute(f"DROP TABLE IF EXISTS {table}") # noqa: S608
self._tables.pop(table, None)

def is_registered(self, table: str) -> bool:
"""Check whether *table* has been registered."""
return table in self._tables

def close(self) -> None:
"""Close the DuckDB connection."""
self._conn.close()
self._tables.clear()
Loading