From 7b723fe86b93405aaa75917f344afe91be8553ad Mon Sep 17 00:00:00 2001 From: dnwpark Date: Fri, 16 Jan 2026 13:26:39 -0800 Subject: [PATCH 01/15] Add qblike_3. --- tests/test_qblike_3.py | 319 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 319 insertions(+) create mode 100644 tests/test_qblike_3.py diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py new file mode 100644 index 0000000..0aeff9b --- /dev/null +++ b/tests/test_qblike_3.py @@ -0,0 +1,319 @@ +import dataclasses +import enum +import textwrap + +from typing import ( + Callable, + ForwardRef, + Literal, + Never, + ReadOnly, + Self, + TypedDict, +) + +from typemap.type_eval import eval_typing +from typemap.typing import ( + Attrs, + GetArg, + GetAttr, + GetName, + GetQuals, + GetType, + GetInit, + InitField, + Iter, + Member, + NewProtocol, + Param, + IsSub, +) + +from . import format_helper + + +type ReplaceNever[T, D] = T if not IsSub[T, Never] else D +type GetFieldItem[T: InitField, K, Default] = ReplaceNever[ + GetAttr[GetArg[T, InitField, Literal[0]], K], Default +] + + +@dataclasses.dataclass(frozen=True) +class DbBoolean: + pass + + +@dataclasses.dataclass(frozen=True) +class DbInteger: + pass + + +@dataclasses.dataclass(frozen=True) +class DbString: + length: int + + +type DbType = DbInteger | DbString + + +class Table[name: str]: + pass + + +class FieldArgs(TypedDict, total=False): + primary_key: ReadOnly[bool] + db_type: ReadOnly[DbType] + nullable: ReadOnly[bool] + unique: ReadOnly[bool] + autoincrement: ReadOnly[bool] = False + default: ReadOnly[object] + + +class Field[Args: FieldArgs](InitField[Args]): + pass + + +type FieldIsNullable[Init] = GetFieldItem[ + Init, Literal["nullable"], Literal[True] +] +type FieldIsAutoincrement[Init] = GetFieldItem[ + Init, Literal["autoincrement"], Literal[False] +] +type FieldHasDefault[Init] = ( + Literal[True] + if not IsSub[GetFieldItem[Init, Literal["default"], Never], Never] + else Literal[False] +) + + +type FieldIsRequiredForCreate[Init] = ( + Literal[True] + if not IsSub[Literal[True], FieldIsNullable[Init]] + and not IsSub[Literal[True], FieldIsAutoincrement[Init]] + and not IsSub[Literal[True], FieldHasDefault[Init]] + else Literal[False] +) +type FieldIsDefaultNone[Init] = ( + Literal[True] + if IsSub[Literal[True], FieldIsNullable[Init]] + and not IsSub[Literal[True], FieldIsAutoincrement[Init]] + and IsSub[GetFieldItem[Init, Literal["default"], None], None] + else Literal[False] +) + + +class Cardinality(enum.Enum): + ONE = "ONE" + MANY = "MANY" + + +class DbLinkTargetArgs(TypedDict, total=False): + target: ReadOnly[type[Table] | ForwardRef] + cardinality: ReadOnly[Cardinality] = Cardinality.ONE + + +class DbLinkTarget[Args: DbLinkTargetArgs](InitField[Args]): + pass + + +class DbLinkSourceArgs(TypedDict, total=False): + source: ReadOnly[type[Table] | ForwardRef] + cardinality: ReadOnly[Cardinality] = Cardinality.ONE + + +class DbLinkSource[Args: DbLinkSourceArgs](InitField[Args]): + pass + + +class Default: + pass + + +type InitFnType[T] = Member[ + Literal["__init__"], + Callable[ + [ + Param[Literal["self"], Self], + *[ + Param[ + GetName[p], + ( + GetType[p] + if IsSub[ + Literal[True], FieldIsRequiredForCreate[GetInit[p]] + ] + else GetType[p] | None + if IsSub[Literal[True], FieldIsDefaultNone[GetInit[p]]] + else GetType[p] | Default + ), + ( + Literal["keyword"] + if IsSub[ + Literal[True], FieldIsRequiredForCreate[GetInit[p]] + ] + else Literal["keyword", "default"] + ), + ] + for p in Iter[Attrs[T]] + if not IsSub[ + GetFieldItem[GetInit[p], Literal["db_type"], Never], + DbLinkSource, + ] + ], + ], + None, + ], + Literal["ClassVar"], +] +type AddInit[T] = NewProtocol[ + InitFnType[T], + *[Member[GetName[p], GetType[p], GetQuals[p]] for p in Iter[Attrs[T]]], +] + + +class NoChange: + pass + + +type Create[T] = NewProtocol[ + *[ + Member[ + GetName[p], + ( + GetType[p] + if IsSub[ + Literal[True], + FieldIsRequiredForCreate[GetInit[p]], + ] + else GetType[p] | None + if IsSub[ + Literal[True], + FieldIsDefaultNone[GetInit[p]], + ] + else GetType[p] | Default + ), + GetQuals[p], + ] + for p in Iter[Attrs[T]] + if not IsSub[ + Literal[True], + GetFieldItem[GetInit[p], Literal["primary_key"], Never], + ] + and not IsSub[ + GetFieldItem[GetInit[p], Literal["db_type"], Never], DbLinkSource + ] + ], +] +type Update[T] = NewProtocol[ + *[ + Member[GetName[p], GetType[p] | NoChange, GetQuals[p]] + for p in Iter[Attrs[T]] + if not IsSub[ + Literal[True], + GetFieldItem[GetInit[p], Literal["primary_key"], Never], + ] + and not IsSub[ + GetFieldItem[GetInit[p], Literal["db_type"], Never], DbLinkSource + ] + ], +] + + +class User(Table[Literal["users"]]): + id: int = Field(db_type=DbInteger(), primary_key=True, autoincrement=True) + name: str = Field(db_type=DbString(length=50), nullable=False) + email: str = Field( + db_type=DbString(length=100), unique=True, nullable=False + ) + age: int | None = Field(db_type=DbInteger()) + active: bool = Field(db_type=DbBoolean(), default=True) + posts: list[Post] = Field( + db_type=DbLinkSource( + source=ForwardRef("Post"), cardinality=Cardinality.MANY + ) + ) + + +class Post(Table[Literal["posts"]]): + id: int = Field(db_type=DbInteger(), primary_key=True, autoincrement=True) + content: str = Field(db_type=DbString(length=1000), nullable=False) + author: User = Field( + db_type=DbLinkTarget(target=ForwardRef("User")), nullable=False + ) + + +def test_qblike_3_add_init_01(): + tgt = eval_typing(AddInit[User]) + fmt = format_helper.format_class(tgt) + + assert fmt == textwrap.dedent("""\ + class AddInit[tests.test_qblike_3.User]: + id: int + name: str + email: str + age: int | None + active: bool + posts: list[tests.test_qblike_3.Post] + def __init__(self: Self, *, id: int | tests.test_qblike_3.Default = ..., name: str, email: str, age: int | None = ..., active: bool | tests.test_qblike_3.Default = ...) -> None: ... + """) + + +def test_qblike_3_add_init_02(): + tgt = eval_typing(AddInit[Post]) + fmt = format_helper.format_class(tgt) + + assert fmt == textwrap.dedent("""\ + class AddInit[tests.test_qblike_3.Post]: + id: int + content: str + author: tests.test_qblike_3.User + def __init__(self: Self, *, id: int | tests.test_qblike_3.Default = ..., content: str, author: tests.test_qblike_3.User) -> None: ... + """) + + +def test_qblike_3_create_01(): + tgt = eval_typing(Create[User]) + fmt = format_helper.format_class(tgt) + + assert fmt == textwrap.dedent("""\ + class Create[tests.test_qblike_3.User]: + name: str + email: str + age: int | None + active: bool | tests.test_qblike_3.Default + """) + + +def test_qblike_3_create_02(): + tgt = eval_typing(Create[Post]) + fmt = format_helper.format_class(tgt) + + assert fmt == textwrap.dedent("""\ + class Create[tests.test_qblike_3.Post]: + content: str + author: tests.test_qblike_3.User + """) + + +def test_qblike_3_update_01(): + tgt = eval_typing(Update[User]) + fmt = format_helper.format_class(tgt) + + assert fmt == textwrap.dedent("""\ + class Update[tests.test_qblike_3.User]: + name: str | tests.test_qblike_3.NoChange + email: str | tests.test_qblike_3.NoChange + age: int | None | tests.test_qblike_3.NoChange + active: bool | tests.test_qblike_3.NoChange + """) + + +def test_qblike_3_update_02(): + tgt = eval_typing(Update[Post]) + fmt = format_helper.format_class(tgt) + + assert fmt == textwrap.dedent("""\ + class Update[tests.test_qblike_3.Post]: + content: str | tests.test_qblike_3.NoChange + author: tests.test_qblike_3.User | tests.test_qblike_3.NoChange + """) From 7d2da99b5b34e65f6c1db5aa812ad84eb6769da7 Mon Sep 17 00:00:00 2001 From: dnwpark Date: Fri, 16 Jan 2026 13:35:24 -0800 Subject: [PATCH 02/15] Some comments. --- tests/test_qblike_3.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index 0aeff9b..8a47363 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -32,6 +32,31 @@ from . import format_helper +""" +An example of a SQL-Alchemy like ORM. + +The User and Post classes model a SQLite schema: +``` +CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + email TEXT UNIQUE NOT NULL, + age INTEGER, + active BOOLEAN DEFAULT TRUE +); + +CREATE TABLE posts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + content TEXT NOT NULL, + author_id INTEGER NOT NULL, + FOREIGN KEY (author_id) REFERENCES users (id) +); +``` + +Protocols are generated using AddInit[T], Create[T], and Update[T]. +""" + + type ReplaceNever[T, D] = T if not IsSub[T, Never] else D type GetFieldItem[T: InitField, K, Default] = ReplaceNever[ GetAttr[GetArg[T, InitField, Literal[0]], K], Default From 123b1bce1f79f44eb1135edc02ade2518331f536 Mon Sep 17 00:00:00 2001 From: dnwpark Date: Mon, 26 Jan 2026 08:55:41 -0800 Subject: [PATCH 03/15] Include table name in field. --- tests/test_qblike_3.py | 65 +++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 23 deletions(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index 8a47363..8d21ac2 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -3,6 +3,7 @@ import textwrap from typing import ( + Any, Callable, ForwardRef, Literal, @@ -85,7 +86,15 @@ class Table[name: str]: pass -class FieldArgs(TypedDict, total=False): +class Field[Table, PyType]: + def __lt__(self, other: Any) -> Filter: ... + + +type FieldTable[T] = GetArg[GetType[T], Field, Literal[0]] +type FieldPyType[T] = GetArg[GetType[T], Field, Literal[1]] + + +class ColumnArgs(TypedDict, total=False): primary_key: ReadOnly[bool] db_type: ReadOnly[DbType] nullable: ReadOnly[bool] @@ -94,7 +103,7 @@ class FieldArgs(TypedDict, total=False): default: ReadOnly[object] -class Field[Args: FieldArgs](InitField[Args]): +class column[Args: ColumnArgs](InitField[Args]): pass @@ -127,6 +136,10 @@ class Field[Args: FieldArgs](InitField[Args]): ) +class Filter: + pass + + class Cardinality(enum.Enum): ONE = "ONE" MANY = "MANY" @@ -163,13 +176,13 @@ class Default: Param[ GetName[p], ( - GetType[p] + FieldPyType[p] if IsSub[ Literal[True], FieldIsRequiredForCreate[GetInit[p]] ] - else GetType[p] | None + else FieldPyType[p] | None if IsSub[Literal[True], FieldIsDefaultNone[GetInit[p]]] - else GetType[p] | Default + else FieldPyType[p] | Default ), ( Literal["keyword"] @@ -192,7 +205,7 @@ class Default: ] type AddInit[T] = NewProtocol[ InitFnType[T], - *[Member[GetName[p], GetType[p], GetQuals[p]] for p in Iter[Attrs[T]]], + *[Member[GetName[p], FieldPyType[p], GetQuals[p]] for p in Iter[Attrs[T]]], ] @@ -205,17 +218,17 @@ class NoChange: Member[ GetName[p], ( - GetType[p] + FieldPyType[p] if IsSub[ Literal[True], FieldIsRequiredForCreate[GetInit[p]], ] - else GetType[p] | None + else FieldPyType[p] | None if IsSub[ Literal[True], FieldIsDefaultNone[GetInit[p]], ] - else GetType[p] | Default + else FieldPyType[p] | Default ), GetQuals[p], ] @@ -231,7 +244,7 @@ class NoChange: ] type Update[T] = NewProtocol[ *[ - Member[GetName[p], GetType[p] | NoChange, GetQuals[p]] + Member[GetName[p], FieldPyType[p] | NoChange, GetQuals[p]] for p in Iter[Attrs[T]] if not IsSub[ Literal[True], @@ -245,25 +258,31 @@ class NoChange: class User(Table[Literal["users"]]): - id: int = Field(db_type=DbInteger(), primary_key=True, autoincrement=True) - name: str = Field(db_type=DbString(length=50), nullable=False) - email: str = Field( + id: Field[User, int] = column( + db_type=DbInteger(), primary_key=True, autoincrement=True + ) + name: Field[User, str] = column( + db_type=DbString(length=150), nullable=False + ) + email: Field[User, str] = column( db_type=DbString(length=100), unique=True, nullable=False ) - age: int | None = Field(db_type=DbInteger()) - active: bool = Field(db_type=DbBoolean(), default=True) - posts: list[Post] = Field( - db_type=DbLinkSource( - source=ForwardRef("Post"), cardinality=Cardinality.MANY - ) + age: Field[User, int | None] = column(db_type=DbInteger()) + active: Field[User, bool] = column(db_type=DbBoolean(), default=True) + posts: Field[User, list[Post]] = column( + db_type=DbLinkSource(source="Post", cardinality=Cardinality.MANY) ) class Post(Table[Literal["posts"]]): - id: int = Field(db_type=DbInteger(), primary_key=True, autoincrement=True) - content: str = Field(db_type=DbString(length=1000), nullable=False) - author: User = Field( - db_type=DbLinkTarget(target=ForwardRef("User")), nullable=False + id: Field[Post, int] = column( + db_type=DbInteger(), primary_key=True, autoincrement=True + ) + content: Field[Post, str] = column( + db_type=DbString(length=1000), nullable=False + ) + author: Field[Post, User] = column( + db_type=DbLinkTarget(target="User"), nullable=False ) From ec5999457d04b68a16daa30182c0730ee7272959 Mon Sep 17 00:00:00 2001 From: dnwpark Date: Mon, 26 Jan 2026 10:05:23 -0800 Subject: [PATCH 04/15] Add Comments type. --- tests/test_qblike_3.py | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index 8d21ac2..def700c 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -52,6 +52,15 @@ author_id INTEGER NOT NULL, FOREIGN KEY (author_id) REFERENCES users (id) ); + +CREATE TABLE comments ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + content TEXT NOT NULL, + author_id INTEGER NOT NULL, + post_id INTEGER NOT NULL, + FOREIGN KEY (author_id) REFERENCES users (id), + FOREIGN KEY (post_id) REFERENCES posts (id) +); ``` Protocols are generated using AddInit[T], Create[T], and Update[T]. @@ -282,7 +291,25 @@ class Post(Table[Literal["posts"]]): db_type=DbString(length=1000), nullable=False ) author: Field[Post, User] = column( - db_type=DbLinkTarget(target="User"), nullable=False + db_type=DbLinkTarget(target=User), nullable=False + ) + comments: Field[Post, list[Comment]] = column( + db_type=DbLinkSource(source="Comment", cardinality=Cardinality.MANY) + ) + + +class Comment(Table[Literal["comments"]]): + id: Field[Comment, int] = column( + db_type=DbInteger(), primary_key=True, autoincrement=True + ) + content: Field[Comment, str] = column( + db_type=DbString(length=1000), nullable=False + ) + author: Field[Comment, User] = column( + db_type=DbLinkTarget(target=User), nullable=False + ) + post: Field[Comment, Post] = column( + db_type=DbLinkTarget(target=Post), nullable=False ) @@ -311,6 +338,7 @@ class AddInit[tests.test_qblike_3.Post]: id: int content: str author: tests.test_qblike_3.User + comments: list[tests.test_qblike_3.Comment] def __init__(self: Self, *, id: int | tests.test_qblike_3.Default = ..., content: str, author: tests.test_qblike_3.User) -> None: ... """) From 222ae346dfe3a34db478a4e304f09aeca4a1cc70 Mon Sep 17 00:00:00 2001 From: dnwpark Date: Mon, 26 Jan 2026 15:35:44 -0800 Subject: [PATCH 05/15] Add Session, select, Query, QueryRow. --- tests/test_qblike_3.py | 275 ++++++++++++++++++++--------------------- 1 file changed, 133 insertions(+), 142 deletions(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index def700c..55b42a7 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -4,18 +4,18 @@ from typing import ( Any, - Callable, ForwardRef, Literal, Never, ReadOnly, - Self, TypedDict, + Unpack, ) -from typemap.type_eval import eval_typing +from typemap.type_eval import eval_call_with_types, eval_typing from typemap.typing import ( Attrs, + Length, GetArg, GetAttr, GetName, @@ -26,7 +26,6 @@ Iter, Member, NewProtocol, - Param, IsSub, ) @@ -67,12 +66,18 @@ """ +# Type Helpers + + type ReplaceNever[T, D] = T if not IsSub[T, Never] else D -type GetFieldItem[T: InitField, K, Default] = ReplaceNever[ +type GetInitFieldItem[T: InitField, K, Default] = ReplaceNever[ GetAttr[GetArg[T, InitField, Literal[0]], K], Default ] +# Database Types + + @dataclasses.dataclass(frozen=True) class DbBoolean: pass @@ -96,7 +101,7 @@ class Table[name: str]: class Field[Table, PyType]: - def __lt__(self, other: Any) -> Filter: ... + def __lt__(self, other: Any) -> Filter[Table]: ... type FieldTable[T] = GetArg[GetType[T], Field, Literal[0]] @@ -116,36 +121,31 @@ class column[Args: ColumnArgs](InitField[Args]): pass -type FieldIsNullable[Init] = GetFieldItem[ +type ColumnInitIsNullable[Init] = GetInitFieldItem[ Init, Literal["nullable"], Literal[True] ] -type FieldIsAutoincrement[Init] = GetFieldItem[ +type ColumnInitIsAutoincrement[Init] = GetInitFieldItem[ Init, Literal["autoincrement"], Literal[False] ] -type FieldHasDefault[Init] = ( +type ColumnInitHasDefault[Init] = ( Literal[True] - if not IsSub[GetFieldItem[Init, Literal["default"], Never], Never] + if not IsSub[GetInitFieldItem[Init, Literal["default"], Never], Never] else Literal[False] ) - -type FieldIsRequiredForCreate[Init] = ( +type FieldValueNeverNull[F, C] = ( Literal[True] - if not IsSub[Literal[True], FieldIsNullable[Init]] - and not IsSub[Literal[True], FieldIsAutoincrement[Init]] - and not IsSub[Literal[True], FieldHasDefault[Init]] - else Literal[False] -) -type FieldIsDefaultNone[Init] = ( - Literal[True] - if IsSub[Literal[True], FieldIsNullable[Init]] - and not IsSub[Literal[True], FieldIsAutoincrement[Init]] - and IsSub[GetFieldItem[Init, Literal["default"], None], None] + if not IsSub[Literal[True], ColumnInitIsNullable[C]] + or IsSub[Literal[True], ColumnInitIsAutoincrement[C]] + or ( + IsSub[FieldPyType[F], list] + and IsSub[GetArg[FieldPyType[F], list, Literal[0]], Table] + ) else Literal[False] ) -class Filter: +class Filter[T: Table]: pass @@ -172,57 +172,10 @@ class DbLinkSource[Args: DbLinkSourceArgs](InitField[Args]): pass -class Default: - pass - - -type InitFnType[T] = Member[ - Literal["__init__"], - Callable[ - [ - Param[Literal["self"], Self], - *[ - Param[ - GetName[p], - ( - FieldPyType[p] - if IsSub[ - Literal[True], FieldIsRequiredForCreate[GetInit[p]] - ] - else FieldPyType[p] | None - if IsSub[Literal[True], FieldIsDefaultNone[GetInit[p]]] - else FieldPyType[p] | Default - ), - ( - Literal["keyword"] - if IsSub[ - Literal[True], FieldIsRequiredForCreate[GetInit[p]] - ] - else Literal["keyword", "default"] - ), - ] - for p in Iter[Attrs[T]] - if not IsSub[ - GetFieldItem[GetInit[p], Literal["db_type"], Never], - DbLinkSource, - ] - ], - ], - None, - ], - Literal["ClassVar"], -] -type AddInit[T] = NewProtocol[ - InitFnType[T], - *[Member[GetName[p], FieldPyType[p], GetQuals[p]] for p in Iter[Attrs[T]]], -] - - -class NoChange: - pass +# Query Types -type Create[T] = NewProtocol[ +type Select[T] = NewProtocol[ *[ Member[ GetName[p], @@ -230,40 +183,64 @@ class NoChange: FieldPyType[p] if IsSub[ Literal[True], - FieldIsRequiredForCreate[GetInit[p]], + FieldValueNeverNull[GetType[p], GetInit[p]], ] else FieldPyType[p] | None - if IsSub[ - Literal[True], - FieldIsDefaultNone[GetInit[p]], - ] - else FieldPyType[p] | Default ), GetQuals[p], ] for p in Iter[Attrs[T]] - if not IsSub[ - Literal[True], - GetFieldItem[GetInit[p], Literal["primary_key"], Never], - ] - and not IsSub[ - GetFieldItem[GetInit[p], Literal["db_type"], Never], DbLinkSource - ] ], ] -type Update[T] = NewProtocol[ - *[ - Member[GetName[p], FieldPyType[p] | NoChange, GetQuals[p]] - for p in Iter[Attrs[T]] - if not IsSub[ - Literal[True], - GetFieldItem[GetInit[p], Literal["primary_key"], Never], - ] - and not IsSub[ - GetFieldItem[GetInit[p], Literal["db_type"], Never], DbLinkSource + + +type AddTable[Tables, New] = ( + Tables + if any(IsSub[t, New] and IsSub[New, t] for t in Iter[Tables]) + else tuple[*[t for t in Iter[Tables]], New] +) +type AddTables[Tables, News] = ( + Tables + if IsSub[Length[News], Literal[0]] + else AddTables[ + AddTable[Tables, GetArg[News, tuple, Literal[0]]], + tuple[*([n for n in Iter[News]][1:])], + ] +) +type UniqueTables[Tables] = AddTables[tuple[()], Tables] + + +def select[*E]( + *entity: Unpack[E], +) -> Query[UniqueTables[tuple[*[e for e in Iter[E]]]]]: ... + + +class Query[E: tuple[type[Table], ...]]: + pass + + +type QueryRow[E: tuple[type[Table], ...]] = ( + Select[GetArg[E, tuple, Literal[0]]] + if IsSub[Literal[1], Length[E]] + else NewProtocol[ + *[ + Member[ + Literal[e.__name__], + Select[e], + ] + for e in Iter[E] ] - ], -] + ] +) + + +class Session: + def execute[E: tuple[type[Table], ...]]( + self, query: Query[E] + ) -> list[QueryRow[E]]: ... + + +# Application Types class User(Table[Literal["users"]]): @@ -277,7 +254,9 @@ class User(Table[Literal["users"]]): db_type=DbString(length=100), unique=True, nullable=False ) age: Field[User, int | None] = column(db_type=DbInteger()) - active: Field[User, bool] = column(db_type=DbBoolean(), default=True) + active: Field[User, bool] = column( + db_type=DbBoolean(), default=True, nullable=False + ) posts: Field[User, list[Post]] = column( db_type=DbLinkSource(source="Post", cardinality=Cardinality.MANY) ) @@ -313,79 +292,91 @@ class Comment(Table[Literal["comments"]]): ) -def test_qblike_3_add_init_01(): - tgt = eval_typing(AddInit[User]) - fmt = format_helper.format_class(tgt) +# Tests + + +def test_qblike_3_select_01(): + # select(User) + query = eval_call_with_types(select, User) + fmt = format_helper.format_class(query) assert fmt == textwrap.dedent("""\ - class AddInit[tests.test_qblike_3.User]: + class Query[tuple[tests.test_qblike_3.User]]: + """) + + results = eval_call_with_types(Session.execute, Session, query) + result = eval_typing(GetArg[results, list, Literal[0]]) + fmt = format_helper.format_class(result) + assert fmt == textwrap.dedent("""\ + class Select[tests.test_qblike_3.User]: id: int name: str email: str age: int | None active: bool posts: list[tests.test_qblike_3.Post] - def __init__(self: Self, *, id: int | tests.test_qblike_3.Default = ..., name: str, email: str, age: int | None = ..., active: bool | tests.test_qblike_3.Default = ...) -> None: ... """) -def test_qblike_3_add_init_02(): - tgt = eval_typing(AddInit[Post]) - fmt = format_helper.format_class(tgt) +def test_qblike_3_select_02(): + # select(User, User) + query = eval_call_with_types(select, User, User) + fmt = format_helper.format_class(query) assert fmt == textwrap.dedent("""\ - class AddInit[tests.test_qblike_3.Post]: - id: int - content: str - author: tests.test_qblike_3.User - comments: list[tests.test_qblike_3.Comment] - def __init__(self: Self, *, id: int | tests.test_qblike_3.Default = ..., content: str, author: tests.test_qblike_3.User) -> None: ... + class Query[tuple[tests.test_qblike_3.User]]: """) - -def test_qblike_3_create_01(): - tgt = eval_typing(Create[User]) - fmt = format_helper.format_class(tgt) - + results = eval_call_with_types(Session.execute, Session, query) + result = eval_typing(GetArg[results, list, Literal[0]]) + fmt = format_helper.format_class(result) assert fmt == textwrap.dedent("""\ - class Create[tests.test_qblike_3.User]: + class Select[tests.test_qblike_3.User]: + id: int name: str email: str age: int | None - active: bool | tests.test_qblike_3.Default + active: bool + posts: list[tests.test_qblike_3.Post] """) -def test_qblike_3_create_02(): - tgt = eval_typing(Create[Post]) - fmt = format_helper.format_class(tgt) +def test_qblike_3_select_03(): + # select(User, Post) + query = eval_call_with_types(select, User, Post) + fmt = format_helper.format_class(query) assert fmt == textwrap.dedent("""\ - class Create[tests.test_qblike_3.Post]: - content: str - author: tests.test_qblike_3.User + class Query[tuple[tests.test_qblike_3.User, tests.test_qblike_3.Post]]: """) - -def test_qblike_3_update_01(): - tgt = eval_typing(Update[User]) - fmt = format_helper.format_class(tgt) - + results = eval_call_with_types(Session.execute, Session, query) + result = eval_typing(GetArg[results, list, Literal[0]]) + fmt = format_helper.format_class(result) assert fmt == textwrap.dedent("""\ - class Update[tests.test_qblike_3.User]: - name: str | tests.test_qblike_3.NoChange - email: str | tests.test_qblike_3.NoChange - age: int | None | tests.test_qblike_3.NoChange - active: bool | tests.test_qblike_3.NoChange + class QueryRow[tuple[tests.test_qblike_3.User, tests.test_qblike_3.Post]]: + User: tests.test_qblike_3.Select[tests.test_qblike_3.User] + Post: tests.test_qblike_3.Select[tests.test_qblike_3.Post] """) + result_user = eval_typing(GetAttr[result, Literal["User"]]) + fmt = format_helper.format_class(result_user) + assert fmt == textwrap.dedent("""\ + class Select[tests.test_qblike_3.User]: + id: int + name: str + email: str + age: int | None + active: bool + posts: list[tests.test_qblike_3.Post] + """) -def test_qblike_3_update_02(): - tgt = eval_typing(Update[Post]) - fmt = format_helper.format_class(tgt) - + result_post = eval_typing(GetAttr[result, Literal["Post"]]) + fmt = format_helper.format_class(result_post) assert fmt == textwrap.dedent("""\ - class Update[tests.test_qblike_3.Post]: - content: str | tests.test_qblike_3.NoChange - author: tests.test_qblike_3.User | tests.test_qblike_3.NoChange + class Select[tests.test_qblike_3.Post]: + id: int + content: str + author: tests.test_qblike_3.User + comments: list[tests.test_qblike_3.Comment] """) From 140282b544b7696acef69fd44ada63526da8618d Mon Sep 17 00:00:00 2001 From: dnwpark Date: Mon, 26 Jan 2026 17:43:19 -0800 Subject: [PATCH 06/15] Select on tables or columns. --- tests/test_qblike_3.py | 324 +++++++++++++++++++++++++++++++---------- 1 file changed, 248 insertions(+), 76 deletions(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index 55b42a7..edc3828 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -19,7 +19,6 @@ GetArg, GetAttr, GetName, - GetQuals, GetType, GetInit, InitField, @@ -73,6 +72,7 @@ type GetInitFieldItem[T: InitField, K, Default] = ReplaceNever[ GetAttr[GetArg[T, InitField, Literal[0]], K], Default ] +type TypeName[T] = Literal[T.__name__] # Database Types @@ -100,12 +100,13 @@ class Table[name: str]: pass -class Field[Table, PyType]: +class Field[Table, Name, PyType]: def __lt__(self, other: Any) -> Filter[Table]: ... -type FieldTable[T] = GetArg[GetType[T], Field, Literal[0]] -type FieldPyType[T] = GetArg[GetType[T], Field, Literal[1]] +type FieldTable[T] = GetArg[T, Field, Literal[0]] +type FieldName[T] = GetArg[T, Field, Literal[1]] +type FieldPyType[T] = GetArg[T, Field, Literal[2]] class ColumnArgs(TypedDict, total=False): @@ -133,13 +134,13 @@ class column[Args: ColumnArgs](InitField[Args]): else Literal[False] ) -type FieldValueNeverNull[F, C] = ( +type ReadValueNeverNull[M] = ( Literal[True] - if not IsSub[Literal[True], ColumnInitIsNullable[C]] - or IsSub[Literal[True], ColumnInitIsAutoincrement[C]] + if not IsSub[Literal[True], ColumnInitIsNullable[GetInit[M]]] + or IsSub[Literal[True], ColumnInitIsAutoincrement[GetInit[M]]] or ( - IsSub[FieldPyType[F], list] - and IsSub[GetArg[FieldPyType[F], list, Literal[0]], Table] + IsSub[FieldPyType[GetType[M]], list] + and IsSub[GetArg[FieldPyType[GetType[M]], list, Literal[0]], Table] ) else Literal[False] ) @@ -175,119 +176,194 @@ class DbLinkSource[Args: DbLinkSourceArgs](InitField[Args]): # Query Types -type Select[T] = NewProtocol[ - *[ - Member[ - GetName[p], - ( - FieldPyType[p] - if IsSub[ - Literal[True], - FieldValueNeverNull[GetType[p], GetInit[p]], - ] - else FieldPyType[p] | None - ), - GetQuals[p], - ] - for p in Iter[Attrs[T]] - ], +type QueryEntry[T: Table, FieldNames: tuple[Literal[str], ...]] = tuple[ + T, FieldNames ] +type EntryTable[E: QueryEntry] = GetArg[E, tuple, Literal[0]] +type EntryFields[E: QueryEntry] = GetArg[E, tuple, Literal[1]] +type EntryFieldMembers[E: QueryEntry] = tuple[ + *[ + m + for m in Iter[Attrs[EntryTable[E]]] + if any(IsSub[GetName[m], f] for f in Iter[EntryFields[E]]) + ] +] -type AddTable[Tables, New] = ( - Tables - if any(IsSub[t, New] and IsSub[New, t] for t in Iter[Tables]) - else tuple[*[t for t in Iter[Tables]], New] +type EntryIsTable[E: QueryEntry, T: Table] = ( + Literal[True] + if IsSub[EntryTable[E], T] and IsSub[T, EntryTable[E]] + else Literal[False] +) +type EntriesHasTable[Es: tuple[QueryEntry, ...], T: Table] = ( + Literal[True] + if any(IsSub[Literal[True], EntryIsTable[e, T]] for e in Iter[Es]) + else Literal[False] ) -type AddTables[Tables, News] = ( - Tables + +type MakeQueryEntryAllFields[T: Table] = QueryEntry[ + T, + tuple[*[GetName[m] for m in Iter[Attrs[T]] if IsSub[GetType[m], Field]],], +] +type MakeQueryEntryNamedFields[ + T: Table, + FieldNames: tuple[Literal[str], ...], +] = QueryEntry[ + T, + tuple[ + *[ + GetName[m] + for m in Iter[Attrs[T]] + if IsSub[GetType[m], Field] + and any(IsSub[FieldName[GetType[m]], f] for f in Iter[FieldNames]) + ], + ], +] + +type AddTable[Entries, New: Table] = tuple[ + *[ # Existing entries + ( + e + if not IsSub[Literal[True], EntryIsTable[e, New]] + else MakeQueryEntryAllFields[New] + ) + for e in Iter[Entries] + ], + *( # Add entries if not present + [] + if IsSub[Literal[True], EntriesHasTable[Entries, New]] + else [MakeQueryEntryAllFields[New]] + ), +] +type AddField[Entries, New: Field] = tuple[ + *[ # Existing entries + ( + e # Non-matching entry + if not IsSub[Literal[True], EntryIsTable[e, FieldTable[New]]] + else MakeQueryEntryNamedFields[ + EntryTable[e], + tuple[*[f for f in Iter[EntryFields[e]]], FieldName[New]], + ] + ) + for e in Iter[Entries] + ], + *( # Add entries if not present + [] + if IsSub[Literal[True], EntriesHasTable[Entries, FieldTable[New]]] + else [QueryEntry[FieldTable[New], tuple[FieldName[New]]]] + ), +] +type AddEntries[Entries, News: tuple[Table | Field, ...]] = ( + Entries if IsSub[Length[News], Literal[0]] - else AddTables[ - AddTable[Tables, GetArg[News, tuple, Literal[0]]], + else AddEntries[ + ( + AddTable[Entries, GetArg[News, tuple, Literal[0]]] + if IsSub[GetArg[News, tuple, Literal[0]], Table] + else AddField[Entries, GetArg[News, tuple, Literal[0]]] + ), tuple[*([n for n in Iter[News]][1:])], ] ) -type UniqueTables[Tables] = AddTables[tuple[()], Tables] +type UniqueEntries[Entries] = AddEntries[tuple[()], Entries] -def select[*E]( - *entity: Unpack[E], -) -> Query[UniqueTables[tuple[*[e for e in Iter[E]]]]]: ... +def select[*Es]( + *entity: Unpack[Es], +) -> Query[UniqueEntries[tuple[*[e for e in Iter[Es]]]]]: ... -class Query[E: tuple[type[Table], ...]]: +class Query[Es: tuple[QueryEntry[Table, tuple[Member]], ...]]: pass -type QueryRow[E: tuple[type[Table], ...]] = ( - Select[GetArg[E, tuple, Literal[0]]] - if IsSub[Literal[1], Length[E]] +type Select[E] = NewProtocol[ + *[ + Member[ + GetName[m], + ( + FieldPyType[GetType[m]] + if IsSub[ + Literal[True], + ReadValueNeverNull[m], + ] + else FieldPyType[GetType[m]] | None + ), + ] + for m in Iter[EntryFieldMembers[E]] + ], +] + + +type QueryRow[Es: tuple[QueryEntry[Table, tuple[Member]], ...]] = ( + Select[GetArg[Es, tuple, Literal[0]]] + if IsSub[Literal[1], Length[Es]] else NewProtocol[ *[ Member[ - Literal[e.__name__], + TypeName[EntryTable[e]], Select[e], ] - for e in Iter[E] + for e in Iter[Es] ] ] ) class Session: - def execute[E: tuple[type[Table], ...]]( - self, query: Query[E] - ) -> list[QueryRow[E]]: ... + def execute[Es: tuple[type[Table], ...]]( + self, query: Query[Es] + ) -> list[QueryRow[Es]]: ... # Application Types class User(Table[Literal["users"]]): - id: Field[User, int] = column( + id: Field[User, Literal["id"], int] = column( db_type=DbInteger(), primary_key=True, autoincrement=True ) - name: Field[User, str] = column( + name: Field[User, Literal["name"], str] = column( db_type=DbString(length=150), nullable=False ) - email: Field[User, str] = column( + email: Field[User, Literal["email"], str] = column( db_type=DbString(length=100), unique=True, nullable=False ) - age: Field[User, int | None] = column(db_type=DbInteger()) - active: Field[User, bool] = column( + age: Field[User, Literal["age"], int | None] = column(db_type=DbInteger()) + active: Field[User, Literal["active"], bool] = column( db_type=DbBoolean(), default=True, nullable=False ) - posts: Field[User, list[Post]] = column( + posts: Field[User, Literal["posts"], list[Post]] = column( db_type=DbLinkSource(source="Post", cardinality=Cardinality.MANY) ) class Post(Table[Literal["posts"]]): - id: Field[Post, int] = column( + id: Field[Post, Literal["id"], int] = column( db_type=DbInteger(), primary_key=True, autoincrement=True ) - content: Field[Post, str] = column( + content: Field[Post, Literal["content"], str] = column( db_type=DbString(length=1000), nullable=False ) - author: Field[Post, User] = column( + author: Field[Post, Literal["author"], User] = column( db_type=DbLinkTarget(target=User), nullable=False ) - comments: Field[Post, list[Comment]] = column( + comments: Field[Post, Literal["comments"], list[Comment]] = column( db_type=DbLinkSource(source="Comment", cardinality=Cardinality.MANY) ) class Comment(Table[Literal["comments"]]): - id: Field[Comment, int] = column( + id: Field[Comment, Literal["id"], int] = column( db_type=DbInteger(), primary_key=True, autoincrement=True ) - content: Field[Comment, str] = column( + content: Field[Comment, Literal["content"], str] = column( db_type=DbString(length=1000), nullable=False ) - author: Field[Comment, User] = column( + author: Field[Comment, Literal["author"], User] = column( db_type=DbLinkTarget(target=User), nullable=False ) - post: Field[Comment, Post] = column( + post: Field[Comment, Literal["post"], Post] = column( db_type=DbLinkTarget(target=Post), nullable=False ) @@ -295,20 +371,24 @@ class Comment(Table[Literal["comments"]]): # Tests +type AttrNames[T] = tuple[*[GetName[f] for f in Iter[Attrs[T]]]] + + def test_qblike_3_select_01(): # select(User) query = eval_call_with_types(select, User) - fmt = format_helper.format_class(query) + fmt = format_helper.format_class(query) assert fmt == textwrap.dedent("""\ - class Query[tuple[tests.test_qblike_3.User]]: + class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]]]]: """) results = eval_call_with_types(Session.execute, Session, query) result = eval_typing(GetArg[results, list, Literal[0]]) + fmt = format_helper.format_class(result) assert fmt == textwrap.dedent("""\ - class Select[tests.test_qblike_3.User]: + class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]]]: id: int name: str email: str @@ -321,17 +401,18 @@ class Select[tests.test_qblike_3.User]: def test_qblike_3_select_02(): # select(User, User) query = eval_call_with_types(select, User, User) - fmt = format_helper.format_class(query) + fmt = format_helper.format_class(query) assert fmt == textwrap.dedent("""\ - class Query[tuple[tests.test_qblike_3.User]]: + class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]]]]: """) results = eval_call_with_types(Session.execute, Session, query) result = eval_typing(GetArg[results, list, Literal[0]]) + fmt = format_helper.format_class(result) assert fmt == textwrap.dedent("""\ - class Select[tests.test_qblike_3.User]: + class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]]]: id: int name: str email: str @@ -344,25 +425,22 @@ class Select[tests.test_qblike_3.User]: def test_qblike_3_select_03(): # select(User, Post) query = eval_call_with_types(select, User, Post) - fmt = format_helper.format_class(query) + fmt = format_helper.format_class(query) assert fmt == textwrap.dedent("""\ - class Query[tuple[tests.test_qblike_3.User, tests.test_qblike_3.Post]]: + class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]], tuple[tests.test_qblike_3.Post, tuple[typing.Literal['id'], typing.Literal['content'], typing.Literal['author'], typing.Literal['comments']]]]]: """) results = eval_call_with_types(Session.execute, Session, query) result = eval_typing(GetArg[results, list, Literal[0]]) - fmt = format_helper.format_class(result) - assert fmt == textwrap.dedent("""\ - class QueryRow[tuple[tests.test_qblike_3.User, tests.test_qblike_3.Post]]: - User: tests.test_qblike_3.Select[tests.test_qblike_3.User] - Post: tests.test_qblike_3.Select[tests.test_qblike_3.Post] - """) + + result_names = eval_typing(AttrNames[result]) + assert result_names == tuple[Literal["User"], Literal["Post"]] result_user = eval_typing(GetAttr[result, Literal["User"]]) fmt = format_helper.format_class(result_user) assert fmt == textwrap.dedent("""\ - class Select[tests.test_qblike_3.User]: + class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]]]: id: int name: str email: str @@ -374,9 +452,103 @@ class Select[tests.test_qblike_3.User]: result_post = eval_typing(GetAttr[result, Literal["Post"]]) fmt = format_helper.format_class(result_post) assert fmt == textwrap.dedent("""\ - class Select[tests.test_qblike_3.Post]: + class Select[tuple[tests.test_qblike_3.Post, tuple[typing.Literal['id'], typing.Literal['content'], typing.Literal['author'], typing.Literal['comments']]]]: id: int content: str author: tests.test_qblike_3.User comments: list[tests.test_qblike_3.Comment] """) + + +def test_qblike_3_select_04(): + # select(User.name) + user_name = eval_typing(GetAttr[User, Literal["name"]]) + query = eval_call_with_types(select, user_name) + + fmt = format_helper.format_class(query) + assert fmt == textwrap.dedent("""\ + class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]]]]: + """) + + results = eval_call_with_types(Session.execute, Session, query) + result = eval_typing(GetArg[results, list, Literal[0]]) + + fmt = format_helper.format_class(result) + assert fmt == textwrap.dedent("""\ + class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]]]: + name: str + """) + + +def test_qblike_3_select_05(): + # select(User.name, User.name) + user_name = eval_typing(GetAttr[User, Literal["name"]]) + query = eval_call_with_types(select, user_name, user_name) + + fmt = format_helper.format_class(query) + assert fmt == textwrap.dedent("""\ + class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]]]]: + """) + + results = eval_call_with_types(Session.execute, Session, query) + result = eval_typing(GetArg[results, list, Literal[0]]) + + fmt = format_helper.format_class(result) + assert fmt == textwrap.dedent("""\ + class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]]]: + name: str + """) + + +def test_qblike_3_select_06(): + # select(User.name, User.email) + user_name = eval_typing(GetAttr[User, Literal["name"]]) + user_email = eval_typing(GetAttr[User, Literal["email"]]) + query = eval_call_with_types(select, user_name, user_email) + + fmt = format_helper.format_class(query) + assert fmt == textwrap.dedent("""\ + class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name'], typing.Literal['email']]]]]: + """) + + results = eval_call_with_types(Session.execute, Session, query) + result = eval_typing(GetArg[results, list, Literal[0]]) + + fmt = format_helper.format_class(result) + assert fmt == textwrap.dedent("""\ + class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name'], typing.Literal['email']]]]: + name: str + email: str + """) + + +def test_qblike_3_select_07(): + # select(User.name, Post.content) + user_name = eval_typing(GetAttr[User, Literal["name"]]) + post_content = eval_typing(GetAttr[Post, Literal["content"]]) + query = eval_call_with_types(select, user_name, post_content) + + fmt = format_helper.format_class(query) + assert fmt == textwrap.dedent("""\ + class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]], tuple[tests.test_qblike_3.Post, tuple[typing.Literal['content']]]]]: + """) + + results = eval_call_with_types(Session.execute, Session, query) + result = eval_typing(GetArg[results, list, Literal[0]]) + + result_names = eval_typing(AttrNames[result]) + assert result_names == tuple[Literal["User"], Literal["Post"]] + + result_user = eval_typing(GetAttr[result, Literal["User"]]) + fmt = format_helper.format_class(result_user) + assert fmt == textwrap.dedent("""\ + class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]]]: + name: str + """) + + result_post = eval_typing(GetAttr[result, Literal["Post"]]) + fmt = format_helper.format_class(result_post) + assert fmt == textwrap.dedent("""\ + class Select[tuple[tests.test_qblike_3.Post, tuple[typing.Literal['content']]]]: + content: str + """) From c08841144b9f55bfe93bf8a390a47b0e7de6e467 Mon Sep 17 00:00:00 2001 From: dnwpark Date: Mon, 26 Jan 2026 17:55:22 -0800 Subject: [PATCH 07/15] Shorten display name of Select type alias. --- tests/test_qblike_3.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index edc3828..c56d008 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -182,11 +182,11 @@ class DbLinkSource[Args: DbLinkSourceArgs](InitField[Args]): type EntryTable[E: QueryEntry] = GetArg[E, tuple, Literal[0]] type EntryFields[E: QueryEntry] = GetArg[E, tuple, Literal[1]] -type EntryFieldMembers[E: QueryEntry] = tuple[ +type EntryFieldMembers[T: Table, FieldNames: tuple[Literal[str], ...]] = tuple[ *[ m - for m in Iter[Attrs[EntryTable[E]]] - if any(IsSub[GetName[m], f] for f in Iter[EntryFields[E]]) + for m in Iter[Attrs[T]] + if any(IsSub[GetName[m], f] for f in Iter[FieldNames]) ] ] @@ -277,7 +277,7 @@ class Query[Es: tuple[QueryEntry[Table, tuple[Member]], ...]]: pass -type Select[E] = NewProtocol[ +type Select[T: Table, FieldNames: tuple[Literal[str], ...]] = NewProtocol[ *[ Member[ GetName[m], @@ -290,19 +290,22 @@ class Query[Es: tuple[QueryEntry[Table, tuple[Member]], ...]]: else FieldPyType[GetType[m]] | None ), ] - for m in Iter[EntryFieldMembers[E]] + for m in Iter[EntryFieldMembers[T, FieldNames]] ], ] type QueryRow[Es: tuple[QueryEntry[Table, tuple[Member]], ...]] = ( - Select[GetArg[Es, tuple, Literal[0]]] + Select[ + EntryTable[GetArg[Es, tuple, Literal[0]]], + EntryFields[GetArg[Es, tuple, Literal[0]]], + ] if IsSub[Literal[1], Length[Es]] else NewProtocol[ *[ Member[ TypeName[EntryTable[e]], - Select[e], + Select[EntryTable[e], EntryFields[e]], ] for e in Iter[Es] ] @@ -388,7 +391,7 @@ class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], ty fmt = format_helper.format_class(result) assert fmt == textwrap.dedent("""\ - class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]]]: + class Select[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]]: id: int name: str email: str @@ -412,7 +415,7 @@ class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], ty fmt = format_helper.format_class(result) assert fmt == textwrap.dedent("""\ - class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]]]: + class Select[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]]: id: int name: str email: str @@ -440,7 +443,7 @@ class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], ty result_user = eval_typing(GetAttr[result, Literal["User"]]) fmt = format_helper.format_class(result_user) assert fmt == textwrap.dedent("""\ - class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]]]: + class Select[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]]: id: int name: str email: str @@ -452,7 +455,7 @@ class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing. result_post = eval_typing(GetAttr[result, Literal["Post"]]) fmt = format_helper.format_class(result_post) assert fmt == textwrap.dedent("""\ - class Select[tuple[tests.test_qblike_3.Post, tuple[typing.Literal['id'], typing.Literal['content'], typing.Literal['author'], typing.Literal['comments']]]]: + class Select[tests.test_qblike_3.Post, tuple[typing.Literal['id'], typing.Literal['content'], typing.Literal['author'], typing.Literal['comments']]]: id: int content: str author: tests.test_qblike_3.User @@ -475,7 +478,7 @@ class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]] fmt = format_helper.format_class(result) assert fmt == textwrap.dedent("""\ - class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]]]: + class Select[tests.test_qblike_3.User, tuple[typing.Literal['name']]]: name: str """) @@ -495,7 +498,7 @@ class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]] fmt = format_helper.format_class(result) assert fmt == textwrap.dedent("""\ - class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]]]: + class Select[tests.test_qblike_3.User, tuple[typing.Literal['name']]]: name: str """) @@ -516,7 +519,7 @@ class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name'], fmt = format_helper.format_class(result) assert fmt == textwrap.dedent("""\ - class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name'], typing.Literal['email']]]]: + class Select[tests.test_qblike_3.User, tuple[typing.Literal['name'], typing.Literal['email']]]: name: str email: str """) @@ -542,13 +545,13 @@ class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]] result_user = eval_typing(GetAttr[result, Literal["User"]]) fmt = format_helper.format_class(result_user) assert fmt == textwrap.dedent("""\ - class Select[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]]]: + class Select[tests.test_qblike_3.User, tuple[typing.Literal['name']]]: name: str """) result_post = eval_typing(GetAttr[result, Literal["Post"]]) fmt = format_helper.format_class(result_post) assert fmt == textwrap.dedent("""\ - class Select[tuple[tests.test_qblike_3.Post, tuple[typing.Literal['content']]]]: + class Select[tests.test_qblike_3.Post, tuple[typing.Literal['content']]]: content: str """) From 0fa314a2bdece6adfa395ece9f28a0db11b51d22 Mon Sep 17 00:00:00 2001 From: dnwpark Date: Mon, 26 Jan 2026 17:55:31 -0800 Subject: [PATCH 08/15] Update comment. --- tests/test_qblike_3.py | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index c56d008..14d8886 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -34,7 +34,7 @@ """ An example of a SQL-Alchemy like ORM. -The User and Post classes model a SQLite schema: +The User, Post, and Comment classes model a SQLite schema: ``` CREATE TABLE users ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -61,7 +61,41 @@ ); ``` -Protocols are generated using AddInit[T], Create[T], and Update[T]. +Users can query using the `select` function, which generates a `Query` object +with the specified tables and fields. + +Users can then execute the query using `Session.execute`, which returns a +list of `QueryRow` objects. + +If a single table is selected, the `QueryRow` object will contain the selected +fields. + +For example, `select(User)` will return a list of: + + class Select[User, tuple[...]]: + id: int + name: str + email: str + age: int | None + active: bool + posts: list[Post] + +If multiple tables are selected, the `QueryRow` object will contain a field for +each table, which in turn contains the selected fields. + +For example, `select(User.name, Post.content)` will return a list of: + + class QueryRow[...]: + User: Select[User, tuple[...]]]: + Post: Select[Post, tuple[...]]]: + + where, + + class Select[User, tuple[...]]: + name: str + + class Select[Post, tuple[...]]: + content: str """ From 5ea4d76a26c5659abfd5bf0ce189fa44a705ea5d Mon Sep 17 00:00:00 2001 From: dnwpark Date: Tue, 27 Jan 2026 08:03:00 -0800 Subject: [PATCH 09/15] Expand DbType. --- tests/test_qblike_3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index 14d8886..fd5a601 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -127,7 +127,7 @@ class DbString: length: int -type DbType = DbInteger | DbString +type DbType = DbBoolean | DbInteger | DbString | DbLinkTarget | DbLinkSource class Table[name: str]: From 26d7b9540cf9329a16edeb011cc102b3e9753a00 Mon Sep 17 00:00:00 2001 From: dnwpark Date: Thu, 29 Jan 2026 08:34:47 -0800 Subject: [PATCH 10/15] Use Slice. --- tests/test_qblike_3.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index fd5a601..176797c 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -22,10 +22,11 @@ GetType, GetInit, InitField, + IsSub, Iter, Member, NewProtocol, - IsSub, + Slice, ) from . import format_helper @@ -296,7 +297,7 @@ class DbLinkSource[Args: DbLinkSourceArgs](InitField[Args]): if IsSub[GetArg[News, tuple, Literal[0]], Table] else AddField[Entries, GetArg[News, tuple, Literal[0]]] ), - tuple[*([n for n in Iter[News]][1:])], + Slice[News, Literal[1], Literal[None]], ] ) type UniqueEntries[Entries] = AddEntries[tuple[()], Entries] From 73fa3e2024fd17aa006247a154263acf6d310f3f Mon Sep 17 00:00:00 2001 From: dnwpark Date: Thu, 29 Jan 2026 08:45:21 -0800 Subject: [PATCH 11/15] Use Bool. --- tests/test_qblike_3.py | 42 ++++++++++++++---------------------------- 1 file changed, 14 insertions(+), 28 deletions(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index 176797c..b51d9be 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -15,6 +15,7 @@ from typemap.type_eval import eval_call_with_types, eval_typing from typemap.typing import ( Attrs, + Bool, Length, GetArg, GetAttr, @@ -163,21 +164,17 @@ class column[Args: ColumnArgs](InitField[Args]): type ColumnInitIsAutoincrement[Init] = GetInitFieldItem[ Init, Literal["autoincrement"], Literal[False] ] -type ColumnInitHasDefault[Init] = ( - Literal[True] - if not IsSub[GetInitFieldItem[Init, Literal["default"], Never], Never] - else Literal[False] -) +type ColumnInitHasDefault[Init] = not IsSub[ + GetInitFieldItem[Init, Literal["default"], Never], Never +] type ReadValueNeverNull[M] = ( - Literal[True] - if not IsSub[Literal[True], ColumnInitIsNullable[GetInit[M]]] - or IsSub[Literal[True], ColumnInitIsAutoincrement[GetInit[M]]] + not Bool[ColumnInitIsNullable[GetInit[M]]] + or Bool[ColumnInitIsAutoincrement[GetInit[M]]] or ( IsSub[FieldPyType[GetType[M]], list] and IsSub[GetArg[FieldPyType[GetType[M]], list, Literal[0]], Table] ) - else Literal[False] ) @@ -226,14 +223,10 @@ class DbLinkSource[Args: DbLinkSourceArgs](InitField[Args]): ] type EntryIsTable[E: QueryEntry, T: Table] = ( - Literal[True] - if IsSub[EntryTable[E], T] and IsSub[T, EntryTable[E]] - else Literal[False] + IsSub[EntryTable[E], T] and IsSub[T, EntryTable[E]] ) -type EntriesHasTable[Es: tuple[QueryEntry, ...], T: Table] = ( - Literal[True] - if any(IsSub[Literal[True], EntryIsTable[e, T]] for e in Iter[Es]) - else Literal[False] +type EntriesHasTable[Es: tuple[QueryEntry, ...], T: Table] = any( + Bool[EntryIsTable[e, T]] for e in Iter[Es] ) type MakeQueryEntryAllFields[T: Table] = QueryEntry[ @@ -257,16 +250,12 @@ class DbLinkSource[Args: DbLinkSourceArgs](InitField[Args]): type AddTable[Entries, New: Table] = tuple[ *[ # Existing entries - ( - e - if not IsSub[Literal[True], EntryIsTable[e, New]] - else MakeQueryEntryAllFields[New] - ) + (e if not Bool[EntryIsTable[e, New]] else MakeQueryEntryAllFields[New]) for e in Iter[Entries] ], *( # Add entries if not present [] - if IsSub[Literal[True], EntriesHasTable[Entries, New]] + if Bool[EntriesHasTable[Entries, New]] else [MakeQueryEntryAllFields[New]] ), ] @@ -274,7 +263,7 @@ class DbLinkSource[Args: DbLinkSourceArgs](InitField[Args]): *[ # Existing entries ( e # Non-matching entry - if not IsSub[Literal[True], EntryIsTable[e, FieldTable[New]]] + if not Bool[EntryIsTable[e, FieldTable[New]]] else MakeQueryEntryNamedFields[ EntryTable[e], tuple[*[f for f in Iter[EntryFields[e]]], FieldName[New]], @@ -284,7 +273,7 @@ class DbLinkSource[Args: DbLinkSourceArgs](InitField[Args]): ], *( # Add entries if not present [] - if IsSub[Literal[True], EntriesHasTable[Entries, FieldTable[New]]] + if Bool[EntriesHasTable[Entries, FieldTable[New]]] else [QueryEntry[FieldTable[New], tuple[FieldName[New]]]] ), ] @@ -318,10 +307,7 @@ class Query[Es: tuple[QueryEntry[Table, tuple[Member]], ...]]: GetName[m], ( FieldPyType[GetType[m]] - if IsSub[ - Literal[True], - ReadValueNeverNull[m], - ] + if Bool[ReadValueNeverNull[m]] else FieldPyType[GetType[m]] | None ), ] From a701cf1159056d3c6bcff483c284a455f8bd74ec Mon Sep 17 00:00:00 2001 From: dnwpark Date: Thu, 29 Jan 2026 08:47:06 -0800 Subject: [PATCH 12/15] Use Matches. --- tests/test_qblike_3.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index b51d9be..88887cf 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -25,6 +25,7 @@ InitField, IsSub, Iter, + Matches, Member, NewProtocol, Slice, @@ -222,9 +223,7 @@ class DbLinkSource[Args: DbLinkSourceArgs](InitField[Args]): ] ] -type EntryIsTable[E: QueryEntry, T: Table] = ( - IsSub[EntryTable[E], T] and IsSub[T, EntryTable[E]] -) +type EntryIsTable[E: QueryEntry, T: Table] = Matches[EntryTable[E], T] type EntriesHasTable[Es: tuple[QueryEntry, ...], T: Table] = any( Bool[EntryIsTable[e, T]] for e in Iter[Es] ) From 7bead0c30af2535e182b4a4348e296ec7df73c91 Mon Sep 17 00:00:00 2001 From: dnwpark Date: Thu, 29 Jan 2026 08:48:32 -0800 Subject: [PATCH 13/15] Fix syntax. --- tests/test_qblike_3.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index 88887cf..8f99542 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -271,9 +271,9 @@ class DbLinkSource[Args: DbLinkSourceArgs](InitField[Args]): for e in Iter[Entries] ], *( # Add entries if not present - [] - if Bool[EntriesHasTable[Entries, FieldTable[New]]] - else [QueryEntry[FieldTable[New], tuple[FieldName[New]]]] + e + for e in Iter[tuple[QueryEntry[FieldTable[New], tuple[FieldName[New]]]]] + if not Bool[EntriesHasTable[Entries, FieldTable[New]]] ), ] type AddEntries[Entries, News: tuple[Table | Field, ...]] = ( From 43271e00c3572ce0a93f1db162043ddd66572802 Mon Sep 17 00:00:00 2001 From: dnwpark Date: Thu, 29 Jan 2026 09:12:32 -0800 Subject: [PATCH 14/15] Simplify syntax. --- tests/test_qblike_3.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index 8f99542..f1e035a 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -291,9 +291,7 @@ class DbLinkSource[Args: DbLinkSourceArgs](InitField[Args]): type UniqueEntries[Entries] = AddEntries[tuple[()], Entries] -def select[*Es]( - *entity: Unpack[Es], -) -> Query[UniqueEntries[tuple[*[e for e in Iter[Es]]]]]: ... +def select[*Es](*entity: Unpack[Es]) -> Query[UniqueEntries[Es]]: ... class Query[Es: tuple[QueryEntry[Table, tuple[Member]], ...]]: From f40dd6eaa02b61b3473a0c577f76422f46d1ff74 Mon Sep 17 00:00:00 2001 From: dnwpark Date: Fri, 30 Jan 2026 14:26:47 -0800 Subject: [PATCH 15/15] Use GetMemberType and GetSpecialAttr. --- tests/test_qblike_3.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/test_qblike_3.py b/tests/test_qblike_3.py index f1e035a..77b6221 100644 --- a/tests/test_qblike_3.py +++ b/tests/test_qblike_3.py @@ -18,8 +18,9 @@ Bool, Length, GetArg, - GetAttr, + GetMemberType, GetName, + GetSpecialAttr, GetType, GetInit, InitField, @@ -107,9 +108,8 @@ class Select[Post, tuple[...]]: type ReplaceNever[T, D] = T if not IsSub[T, Never] else D type GetInitFieldItem[T: InitField, K, Default] = ReplaceNever[ - GetAttr[GetArg[T, InitField, Literal[0]], K], Default + GetMemberType[GetArg[T, InitField, Literal[0]], K], Default ] -type TypeName[T] = Literal[T.__name__] # Database Types @@ -322,7 +322,7 @@ class Query[Es: tuple[QueryEntry[Table, tuple[Member]], ...]]: else NewProtocol[ *[ Member[ - TypeName[EntryTable[e]], + GetSpecialAttr[EntryTable[e], Literal["__name__"]], Select[EntryTable[e], EntryFields[e]], ] for e in Iter[Es] @@ -458,7 +458,7 @@ class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['id'], ty result_names = eval_typing(AttrNames[result]) assert result_names == tuple[Literal["User"], Literal["Post"]] - result_user = eval_typing(GetAttr[result, Literal["User"]]) + result_user = eval_typing(GetMemberType[result, Literal["User"]]) fmt = format_helper.format_class(result_user) assert fmt == textwrap.dedent("""\ class Select[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Literal['name'], typing.Literal['email'], typing.Literal['age'], typing.Literal['active'], typing.Literal['posts']]]: @@ -470,7 +470,7 @@ class Select[tests.test_qblike_3.User, tuple[typing.Literal['id'], typing.Litera posts: list[tests.test_qblike_3.Post] """) - result_post = eval_typing(GetAttr[result, Literal["Post"]]) + result_post = eval_typing(GetMemberType[result, Literal["Post"]]) fmt = format_helper.format_class(result_post) assert fmt == textwrap.dedent("""\ class Select[tests.test_qblike_3.Post, tuple[typing.Literal['id'], typing.Literal['content'], typing.Literal['author'], typing.Literal['comments']]]: @@ -483,7 +483,7 @@ class Select[tests.test_qblike_3.Post, tuple[typing.Literal['id'], typing.Litera def test_qblike_3_select_04(): # select(User.name) - user_name = eval_typing(GetAttr[User, Literal["name"]]) + user_name = eval_typing(GetMemberType[User, Literal["name"]]) query = eval_call_with_types(select, user_name) fmt = format_helper.format_class(query) @@ -503,7 +503,7 @@ class Select[tests.test_qblike_3.User, tuple[typing.Literal['name']]]: def test_qblike_3_select_05(): # select(User.name, User.name) - user_name = eval_typing(GetAttr[User, Literal["name"]]) + user_name = eval_typing(GetMemberType[User, Literal["name"]]) query = eval_call_with_types(select, user_name, user_name) fmt = format_helper.format_class(query) @@ -523,8 +523,8 @@ class Select[tests.test_qblike_3.User, tuple[typing.Literal['name']]]: def test_qblike_3_select_06(): # select(User.name, User.email) - user_name = eval_typing(GetAttr[User, Literal["name"]]) - user_email = eval_typing(GetAttr[User, Literal["email"]]) + user_name = eval_typing(GetMemberType[User, Literal["name"]]) + user_email = eval_typing(GetMemberType[User, Literal["email"]]) query = eval_call_with_types(select, user_name, user_email) fmt = format_helper.format_class(query) @@ -545,8 +545,8 @@ class Select[tests.test_qblike_3.User, tuple[typing.Literal['name'], typing.Lite def test_qblike_3_select_07(): # select(User.name, Post.content) - user_name = eval_typing(GetAttr[User, Literal["name"]]) - post_content = eval_typing(GetAttr[Post, Literal["content"]]) + user_name = eval_typing(GetMemberType[User, Literal["name"]]) + post_content = eval_typing(GetMemberType[Post, Literal["content"]]) query = eval_call_with_types(select, user_name, post_content) fmt = format_helper.format_class(query) @@ -560,14 +560,14 @@ class Query[tuple[tuple[tests.test_qblike_3.User, tuple[typing.Literal['name']]] result_names = eval_typing(AttrNames[result]) assert result_names == tuple[Literal["User"], Literal["Post"]] - result_user = eval_typing(GetAttr[result, Literal["User"]]) + result_user = eval_typing(GetMemberType[result, Literal["User"]]) fmt = format_helper.format_class(result_user) assert fmt == textwrap.dedent("""\ class Select[tests.test_qblike_3.User, tuple[typing.Literal['name']]]: name: str """) - result_post = eval_typing(GetAttr[result, Literal["Post"]]) + result_post = eval_typing(GetMemberType[result, Literal["Post"]]) fmt = format_helper.format_class(result_post) assert fmt == textwrap.dedent("""\ class Select[tests.test_qblike_3.Post, tuple[typing.Literal['content']]]: