From 41a59fe1ed4574b61b036274a6b0b92b649658a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=82=85=E7=AB=8B=E4=B8=9A=EF=BC=88Chris=20Fu=EF=BC=89?= <17433201@qq.com> Date: Tue, 23 Sep 2025 15:14:11 +0800 Subject: [PATCH 1/2] Update `pyproject.toml` --- pyproject.toml | 3 +-- runtime_keypath/__init__.py | 6 +++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1748fa1..5e7927f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,6 @@ build-backend = "setuptools.build_meta" [project] name = "runtime-keypath" -version = "0.1.7" authors = [{ name = "Chris Fu", email = "17433201@qq.com" }] description = "Supports runtime key-path recording/accessing for Python." classifiers = [ @@ -17,7 +16,7 @@ classifiers = [ readme = "README.md" license = { file = "LICENSE" } requires-python = ">=3.8" -dependencies = ["typing_extensions >= 4.9"] +dynamic = ["version"] [project.urls] Homepage = "https://github.com/Azureblade3808/py-runtime-keypath" diff --git a/runtime_keypath/__init__.py b/runtime_keypath/__init__.py index 6eb8646..9735f8d 100644 --- a/runtime_keypath/__init__.py +++ b/runtime_keypath/__init__.py @@ -1 +1,5 @@ -from ._core import * +__version__ = "0.2.0" + +###### + +from ._core import * From be78026e0cd75c7e4a5a57a6359f0a6261eae39a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=82=85=E7=AB=8B=E4=B8=9A=EF=BC=88Chris=20Fu=EF=BC=89?= <17433201@qq.com> Date: Wed, 24 Sep 2025 10:11:20 +0800 Subject: [PATCH 2/2] Rework almost everything - Adopt dynamic versioning for project according to `runtime_keypath.__version__`. - Add settings for Pyright, Black, iSort, pytest in `pyproject.toml`. - Separate implementation of `KeyPath` into "sugarful" and "sugarless" parts, while leaving the "sugarful" part undone. --- pyproject.toml | 119 ++++ runtime_keypath/__init__.py | 2 + .../{_core_test.py => __test__.py} | 2 +- runtime_keypath/__typecheck__.py | 51 ++ runtime_keypath/_core.py | 510 ++++-------------- runtime_keypath/_sugarful.py | 59 ++ runtime_keypath/_sugarless.py | 363 +++++++++++++ 7 files changed, 687 insertions(+), 419 deletions(-) rename runtime_keypath/{_core_test.py => __test__.py} (99%) create mode 100644 runtime_keypath/__typecheck__.py create mode 100644 runtime_keypath/_sugarful.py create mode 100644 runtime_keypath/_sugarless.py diff --git a/pyproject.toml b/pyproject.toml index 5e7927f..c8d2a40 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,3 +21,122 @@ dynamic = ["version"] [project.urls] Homepage = "https://github.com/Azureblade3808/py-runtime-keypath" "Bug Tracker" = "https://github.com/Azureblade3808/py-runtime-keypath/issues" + +[tool.isort] +profile = "black" + +case_sensitive = true +combine_as_imports = true +reverse_relative = true +star_first = true +treat_all_comments_as_code = true + +[tool.pyright] + +# region[Main Configuration Options] +include = ["**/*.py", "**/*.pyi"] + +pythonPlatform = "All" + +typeCheckingMode = "strict" +useLibraryCodeForTypes = false +# endregion + +# region[Type Check Diagnostics Settings] +analyzeUnannotatedFunctions = true +strictParameterNoneValue = false +enableTypeIgnoreComments = true +disableBytesTypePromotions = true +strictListInference = false +strictDictionaryInference = false +strictSetInference = false +deprecateTypingAliases = true +enableExperimentalFeatures = false +reportMissingTypeStubs = "none" +reportMissingModuleSource = "error" +reportInvalidTypeForm = "error" +reportMissingImports = "error" +reportUndefinedVariable = "error" +reportAssertAlwaysTrue = "error" +reportInvalidStringEscapeSequence = "error" +reportInvalidTypeVarUse = "error" +reportSelfClsParameterName = "warning" +reportUnsupportedDunderAll = "warning" +reportUnusedExpression = "none" +reportWildcardImportFromLibrary = "warning" +reportAbstractUsage = "error" +reportArgumentType = "error" +reportAssertTypeFailure = "error" +reportAssignmentType = "error" +reportAttributeAccessIssue = "error" +reportCallIssue = "error" +reportGeneralTypeIssues = "error" +reportInconsistentOverload = "error" +reportIndexIssue = "error" +reportInvalidTypeArguments = "error" +reportNoOverloadImplementation = "error" +reportOperatorIssue = "error" +reportOptionalSubscript = "error" +reportOptionalMemberAccess = "error" +reportOptionalCall = "error" +reportOptionalIterable = "error" +reportOptionalContextManager = "error" +reportOptionalOperand = "error" +reportRedeclaration = "error" +reportReturnType = "error" +reportTypedDictNotRequiredAccess = "error" +reportPrivateImportUsage = "error" +reportUnboundVariable = "error" +reportUnhashable = "error" +reportUnusedCoroutine = "error" +reportUnusedExcept = "error" +reportFunctionMemberAccess = "warning" +reportIncompatibleMethodOverride = "error" +reportIncompatibleVariableOverride = "error" +reportOverlappingOverload = "none" +reportPossiblyUnboundVariable = "error" +reportConstantRedefinition = "error" +reportDeprecated = "warning" +reportDuplicateImport = "error" +reportIncompleteStub = "warning" +reportInconsistentConstructor = "error" +reportInvalidStubStatement = "error" +reportMatchNotExhaustive = "error" +reportMissingParameterType = "none" +reportMissingTypeArgument = "none" +reportPrivateUsage = "none" +reportTypeCommentUsage = "error" +reportUnknownArgumentType = "none" +reportUnknownLambdaType = "none" +reportUnknownMemberType = "none" +reportUnknownParameterType = "none" +reportUnknownVariableType = "none" +reportUnnecessaryCast = "none" +reportUnnecessaryComparison = "none" +reportUnnecessaryContains = "none" +reportUnnecessaryIsInstance = "none" +reportUnusedClass = "warning" +reportUnusedImport = "warning" +reportUnusedFunction = "warning" +reportUnusedVariable = "warning" +reportUntypedBaseClass = "warning" +reportUntypedClassDecorator = "none" +reportUntypedFunctionDecorator = "none" +reportUntypedNamedTuple = "warning" +reportCallInDefaultInitializer = "warning" +reportImplicitOverride = "none" +reportImplicitStringConcatenation = "warning" +reportImportCycles = "none" +reportMissingSuperCall = "none" +reportPropertyTypeMismatch = "none" +reportShadowedImports = "warning" +reportUninitializedInstanceVariable = "none" +reportUnnecessaryTypeIgnoreComment = "none" +reportUnusedCallResult = "error" +# endregion + +[tool.pytest.ini_options] +python_files = ["**/__test__.py", "**/*_test.py"] + +[tool.setuptools.dynamic] +version = { attr = "runtime_keypath.__version__" } diff --git a/runtime_keypath/__init__.py b/runtime_keypath/__init__.py index 9735f8d..7501ae1 100644 --- a/runtime_keypath/__init__.py +++ b/runtime_keypath/__init__.py @@ -3,3 +3,5 @@ ###### from ._core import * +from ._sugarful import * +from ._sugarless import * diff --git a/runtime_keypath/_core_test.py b/runtime_keypath/__test__.py similarity index 99% rename from runtime_keypath/_core_test.py rename to runtime_keypath/__test__.py index 66ab214..33fe713 100644 --- a/runtime_keypath/_core_test.py +++ b/runtime_keypath/__test__.py @@ -6,7 +6,7 @@ import pytest -from ._core import * +from . import * class Tests__KeyPathSupporting: diff --git a/runtime_keypath/__typecheck__.py b/runtime_keypath/__typecheck__.py new file mode 100644 index 0000000..aa84882 --- /dev/null +++ b/runtime_keypath/__typecheck__.py @@ -0,0 +1,51 @@ +""" +Makes sure that type-checking works as expected. +""" + +# pyright: reportUnnecessaryTypeIgnoreComment = true + +###### + +from __future__ import annotations + +__all__ = [] + +###### + +from typing import Any + +from typing_extensions import assert_type + +from . import * + +###### + + +class Check__KeyPath: + @staticmethod + def check__sugarful(any_: Any, /) -> None: + class A: + b: B + + class B: + c: C + + class C: + pass + + a: A = any_ + __ = assert_type(KeyPath[C] or a.b.c, "KeyPath[C]") + + @staticmethod + def check__sugarless(any_: Any, /) -> None: + class A: + b: B + + class B: + c: C + + class C: + pass + + a: A = any_ + __ = assert_type(KeyPath.of(a.b.c), "KeyPath[C]") diff --git a/runtime_keypath/_core.py b/runtime_keypath/_core.py index d5d0ac0..6f283fb 100644 --- a/runtime_keypath/_core.py +++ b/runtime_keypath/_core.py @@ -1,418 +1,92 @@ -from __future__ import annotations - -__all__ = [ - "KeyPath", - "KeyPathSupporting", - "key_path_supporting", -] - -import threading -from collections.abc import Sequence -from typing import ( - TYPE_CHECKING, - Any, - Final, - Generic, - Protocol, - TypeVar, - cast, - final, -) - -try: - from typing_extensions import deprecated, override -except ImportError: - if TYPE_CHECKING: - assert False - else: - deprecated = lambda *args, **kwargs: lambda x: x - override = lambda x: x - -_T = TypeVar("_T") - -_Value_co = TypeVar("_Value_co", covariant=True) -_Value_0 = TypeVar("_Value_0") - -_MISSING = cast("Any", object()) - - -@final -class _KeyPathRecorder: - __slots__ = ("busy", "start", "end", "key_list") - - busy: bool - start: Any - end: Any - key_list: list[str] - - def __init__(self, /) -> None: - self.busy = False - self.start = _MISSING - self.end = _MISSING - self.key_list = [] - - -class _ThreadLocalProtocol(Protocol): - recorder: _KeyPathRecorder - """ - The active key-path recorder for this thread. May not exist. - """ - - -_thread_local = cast("_ThreadLocalProtocol", threading.local()) - - -@final -class _KeyPathMeta(type): - """ - The metaclass for class `KeyPath`. - - It exists mainly to provide `KeyPath.of` as a property. - """ - - # ! `of` is provided as a property here, so that whenever `KeyPath.of` gets - # ! accessed, we can do something before it actually gets called. - @property - def of(self, /) -> _KeyPathOfFunction: - # ! Docstring here is for Pylance hint. - """ - Returns the key-path for accessing a certain value from a target - object with a key sequence such as `a.b.c`. - - The target object and all intermediate objects, except for the - final value, are expected to subclass `KeyPathSupporting`. - - Parameters - ---------- - `value` - A value that is accessed with chained keys such as `a.b.c`. - - Returns - ------- - A key-path that indicates the target object and the key sequence - to access the given value. - - Raises - ------ - `RuntimeError` - Typically occurs when the target or an intermediate object - isn't subclassing `KeyPathSupporting`. Check the error - message for more details. - - Example - ------- - >>> class A(KeyPathSupporting): - ... def __init__(self) -> None: - ... self.b = B() - >>> @key_path_supporting - ... class B: - ... def __init__(self) -> None: - ... self.c = C() - >>> class C: - ... pass - >>> a = A() - >>> key_path = KeyPath.of(a.b.c) - >>> assert key_path.base is a - >>> assert key_path.keys == ("b", "c") - """ - - try: - _ = _thread_local.recorder - except AttributeError: - pass - else: - raise RuntimeError( - " ".join( - [ - "An unfinished key-path recorder has been found.", - "Check if `KeyPath.of` is always called immediatelly.", - ] - ) - ) - - recorder = _KeyPathRecorder() - _thread_local.recorder = recorder - - func = _KeyPathOfFunction() - return func - - -@final -class KeyPath(Generic[_Value_co], metaclass=_KeyPathMeta): - """ - An object that stands for a member chain from a base object. - """ - - __base: Final[Any] - __keys: Final[Sequence[str]] - - def __init__(self, /, target: Any, keys: str | Sequence[str]) -> None: - self.__base = target - - if isinstance(keys, str): - keys = tuple(keys.split(".")) - else: - keys = tuple(keys) - self.__keys = keys - - @property - def base(self, /) -> Any: - return self.__base - - @property - def keys(self, /) -> Sequence[str]: - return self.__keys - - def get(self, /) -> _Value_co: - value = self.__base - for key in self.__keys: - value = getattr(value, key) - return value - - def unsafe_set(self: KeyPath[_Value_0], value: _Value_0, /) -> None: - target = self.__base - keys = self.__keys - i_last_key = len(keys) - 1 - for i in range(i_last_key): - target = getattr(target, keys[i]) - setattr(target, keys[i_last_key], value) - - @deprecated("`KeyPath.set` is deprecated. Use `KeyPath.unsafe_set` instead.") - def set(self: KeyPath[_Value_0], value: _Value_0, /) -> None: - return self.unsafe_set(value) - - @override - def __hash__(self, /) -> int: - return hash((self.base, self.keys)) - - @override - def __eq__(self, other: object, /) -> bool: - return ( - isinstance(other, KeyPath) - and self.base is other.base - and self.keys == other.keys - ) - - @override - def __repr__(self, /) -> str: - type_name = type(self).__name__ - base = self.base - keys = self.keys - return f"{type_name}({base=!r}, {keys=!r})" - - def __call__(self, /) -> _Value_co: - return self.get() - - -# ! We implement the result of `KeyPath.of` as a callable object, so that when an -# ! exception occurred during the key-path access, there would still be a chance to -# ! perform some finalization. -class _KeyPathOfFunction: - # ! Docstring here is for runtime help. - """ - Returns the key-path for accessing a certain value from a target - object with a key sequence such as `a.b.c`. - - The target object and all intermediate objects, except for the - final value, are expected to subclass `KeyPathSupporting`. - - Parameters - ---------- - `value` - A value that is accessed with chained keys such as `a.b.c`. - - Returns - ------- - A key-path that indicates the target object and the key sequence to - access the given value. - - Raises - ------ - `RuntimeError` - Typically occurs when the target or an intermediate object isn't - subclassing `KeyPathSupporting`. Check the error message for - more details. - - Example - ------- - >>> class A(KeyPathSupporting): - ... def __init__(self) -> None: - ... self.b = B() - >>> class B(KeyPathSupporting): - ... def __init__(self) -> None: - ... self.c = C() - >>> class C: - ... pass - >>> a = A() - >>> key_path = KeyPath.of(a.b.c) - >>> assert key_path.base is a - >>> assert key_path.keys == ("b", "c") - """ - - __invoked: bool = False - - def __call__(self, value: _Value_0, /) -> KeyPath[_Value_0]: - self.__invoked = True - - try: - recorder = _thread_local.recorder - except AttributeError: - raise RuntimeError( - " ".join( - [ - "`KeyPath.of` must be accessed and then called immediatedly", - "and should NOT be called more than once.", - ] - ) - ) - - del _thread_local.recorder - - assert not recorder.busy - - start = recorder.start - key_list = recorder.key_list - if start is _MISSING: - assert len(key_list) == 0 - - raise RuntimeError("No key has been recorded.") - else: - assert len(key_list) > 0 - - if recorder.end is not value: - raise RuntimeError( - " ".join( - [ - "Key-path is broken. Check if there is something that does", - "NOT support key-paths in the member chain.", - ] - ) - ) - - key_path = KeyPath(start, key_list) - return key_path - - def __del__(self, /) -> None: - # ! If an exception had occured during the key-path access, or this function - # ! were just discarded without being finally called, we would do some cleaning - # ! here. - if not self.__invoked: - del _thread_local.recorder - - -class KeyPathSupporting: - """ - A base class that supports key-paths. - - Examples - -------- - >>> class C(KeyPathSupporting): - ... v = 0 - >>> c = C() - >>> key_path = KeyPath.of(c.v) - >>> assert key_path.base is c - >>> assert key_path.keys == ("v",) - """ - - # ! This method is intentially not named as `__getattribute__`. See below for - # ! reason. - def _(self, key: str, /) -> Any: - try: - recorder = _thread_local.recorder - except AttributeError: - # There is no recorder, which means that `KeyPath.of` is not being called. - # So we don't need to record this key. - return super().__getattribute__(key) - - if recorder.busy: - # The recorder is busy, which means that another member is being accessed, - # typically because the computation of that member is dependent on this one. - # So we don't need to record this key. - return super().__getattribute__(key) - - recorder.busy = True - - if recorder.start is not _MISSING and recorder.end is not self: - raise RuntimeError( - " ".join( - [ - "Key-path is broken. Check if there is something that does NOT", - "support key-paths in the member chain.", - ] - ) - ) - - value = super().__getattribute__(key) - - recorder.busy = False - if recorder.start is _MISSING: - recorder.start = self - recorder.end = value - recorder.key_list.append(key) - - return value - - # ! `__getattribute__(...)` is declared against `TYPE_CHECKING`, so that unknown - # ! attributes on conforming classes won't be treated as known by type-checkers. - if not TYPE_CHECKING: - __getattribute__ = _ - - del _ - - -def key_path_supporting(clazz: type[_T], /) -> type[_T]: - """ - Patch on a class so that it can support key-paths. - - Examples - -------- - >>> @key_path_supporting - ... class C: - ... v = 0 - >>> c = C() - >>> key_path = KeyPath.of(c.v) - >>> assert key_path.base is c - >>> assert key_path.keys == ("v",) - """ - - old_getattribute = clazz.__getattribute__ - - def __getattribute__(self: _T, key: str) -> Any: - try: - recorder = _thread_local.recorder - except AttributeError: - # There is no recorder, which means that `KeyPath.of` is not being called. - # So we don't need to record this key. - return old_getattribute(self, key) - - if recorder.busy: - # The recorder is busy, which means that another member is being accessed, - # typically because the computation of that member is dependent on this one. - # So we don't need to record this key. - return old_getattribute(self, key) - - recorder.busy = True - - if recorder.start is not _MISSING and recorder.end is not self: - raise RuntimeError( - " ".join( - [ - "Key-path is broken. Check if there is something that does NOT", - "support key-paths in the member chain.", - ] - ) - ) - - value = old_getattribute(self, key) - - recorder.busy = False - if recorder.start is _MISSING: - recorder.start = self - recorder.end = value - recorder.key_list.append(key) - - return value - - clazz.__getattribute__ = __getattribute__ - - return clazz +from __future__ import annotations + +__all__ = ["KeyPath"] + +###### + +from collections.abc import Sequence +from typing import Any, Final, Generic, TypeVar + +from . import _sugarful, _sugarless + +###### + +_V_co = TypeVar("_V_co", covariant=True) +_V_0 = TypeVar("_V_0") + +###### + + +class KeyPathMeta(_sugarful._MixinMeta, _sugarless._MixinMeta): + pass + + +class KeyPath( + _sugarful._Mixin, _sugarless._Mixin, Generic[_V_co], metaclass=KeyPathMeta +): + """ + An object that stands for a member chain from a base object. + """ + + __base: Final[Any] + __keys: Final[tuple[str, ...]] + + def __init__(self, /, target: Any, keys: str | Sequence[str]) -> None: + self.__base = target + + # This initializer will seldom be called by user, so we don't need a sanity + # check here. + if isinstance(keys, str): + keys = tuple(keys.split(".")) + else: + keys = tuple(keys) + self.__keys = keys + + @property + def base(self, /) -> Any: + return self.__base + + @property + def keys(self, /) -> tuple[str, ...]: + return self.__keys + + def get(self, /) -> _V_co: + """ + Get value from the end-point of this key-path. + """ + + value = self.__base + for key in self.__keys: + value = getattr(value, key) + return value + + __call__ = get + + def unsafe_set(self: KeyPath[_V_0], value: _V_0, /) -> None: + """ + Set a value to the end-point of this key-path. + + WARNING + ------- + This method is unsafe, primarily in two ways: + + 1. It may raise exceptions if any key in the key-path doesn't allow writing. + 2. It breaks Liskov substitution principle. + """ + + target = self.__base + keys = self.__keys + i_last_key = len(keys) - 1 + for i in range(i_last_key): + target = getattr(target, keys[i]) + setattr(target, keys[i_last_key], value) + + def __hash__(self, /) -> int: + return hash((id(self.__base), self.__keys)) + + def __eq__(self, other, /) -> bool: + return ( + isinstance(other, KeyPath) + and self.__base is other.__base + and self.__keys == other.__keys + ) diff --git a/runtime_keypath/_sugarful.py b/runtime_keypath/_sugarful.py new file mode 100644 index 0000000..cd2b8ea --- /dev/null +++ b/runtime_keypath/_sugarful.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +__all__ = ["_Mixin"] + +###### + +from typing import TYPE_CHECKING, TypeVar + +###### + +# ! We are performing some conditional imports here, so that the imported items would be +# ! regarded as potentially unbound and could be used in type annotations but not in +# ! code to run. +if 0: + from typing_extensions import TypeForm + + from ._core import KeyPath + +###### + +_T = TypeVar("_T") + +###### + + +class _MixinMeta(type): + if TYPE_CHECKING: + + def __getitem__(self, key: TypeForm[_T], /) -> KeyPath[_T]: + """ + Makes `KeyPath[...]` evaluate into a `KeyPath` object + instead of a generic alias, so that `KeyPath[...] or ...` + will have the desired type. + """ + + ... + + +class _Mixin(metaclass=_MixinMeta): + """ + A mixin class that allows `KeyPath` to be used in a sugarful form. + + Examples + -------- + >>> class A: + ... def __init__(self) -> None: + ... self.b = B() + >>> class B: + ... def __init__(self) -> None: + ... self.c = C() + >>> class C: + ... pass + >>> a = A() + >>> key_path = KeyPath[int] or a.b.c + >>> assert key_path == KeyPath(base=a, keys=("b", "c")) + ``` + """ + + # TODO diff --git a/runtime_keypath/_sugarless.py b/runtime_keypath/_sugarless.py new file mode 100644 index 0000000..9a7f138 --- /dev/null +++ b/runtime_keypath/_sugarless.py @@ -0,0 +1,363 @@ +from __future__ import annotations + +__all__ = [ + "_Mixin", + "KeyPathSupporting", + "key_path_supporting", +] + +###### + +import threading +from typing import TYPE_CHECKING, Any, Final, Protocol, TypeVar, cast, final + +###### + +# ! We are performing some conditional imports here, so that the imported items would be +# ! regarded as potentially unbound and could be used in type annotations but not in +# ! code to run. +if 0: + from ._core import KeyPath + +###### + +_V_t = TypeVar("_V_t") +_V_0 = TypeVar("_V_0") + +###### + +_MISSING = cast("Any", object()) + +###### + + +class _MixinMeta(type): + """ + The metaclass for class `_Mixin`. + + It exists mainly to provide `KeyPath.of` as a property. + """ + + # ! `of` is provided as a property here, so that whenever `KeyPath.of` gets + # ! accessed, we can do something before it actually gets called. + @property + def of( + # ! `_MixinMeta` is the metaclass of only `_Mixin`, and `_Mixin` is directly + # ! inherited by only `KeyPath`, so the typing is safe here. + self: type[KeyPath], # pyright: ignore[reportGeneralTypeIssues] + /, + ) -> _KeyPathOfFunction: + # ! The docstring here is in fact for `_MixinMeta._KeyPathOfFunction`, but it + # ! will help Pylance to display some nice hint for `KeyPath.of`. + """ + Returns the key-path for accessing a certain value from a target + object with a key sequence such as `a.b.c`. + + The target object and all intermediate objects, except for the + final value, are expected to subclass `KeyPathSupporting`. + + Parameters + ---------- + `value` + A value that is accessed with chained keys such as `a.b.c`. + + Returns + ------- + A key-path that indicates the target object and the key sequence + to access the given value. + + Raises + ------ + `RuntimeError` + Typically occurs when the target or an intermediate object + isn't subclassing `KeyPathSupporting`. Check the error + message for more details. + + Example + ------- + >>> class A(KeyPathSupporting): + ... def __init__(self) -> None: + ... self.b = B() + >>> @key_path_supporting + ... class B: + ... def __init__(self) -> None: + ... self.c = C() + >>> class C: + ... pass + >>> a = A() + >>> key_path = KeyPath.of(a.b.c) + >>> assert key_path.base is a + >>> assert key_path.keys == ("b", "c") + """ + + try: + __ = _thread_local.recorder + except AttributeError: + pass + else: + raise RuntimeError( + " ".join( + [ + "An unfinished key-path recorder has been found.", + "Check if `KeyPath.of` is always called immediatelly.", + ] + ) + ) + + recorder = _KeyPathRecorder() + _thread_local.recorder = recorder + + func = _MixinMeta._KeyPathOfFunction(self) + return func + + class _KeyPathOfFunction: + """ + The class of `_MixinMeta.of`. + + We implement the result of `KeyPath.of` as a callable object, so that when an + exception occurred during the key-path access, there would still be a chance to + perform some finalization. + + Note + ---- + This docstring will be overwritten soon. + """ + + __key_path_type: Final[type[KeyPath]] + + def __init__(self, key_path_type: type[KeyPath], /) -> None: + self.__key_path_type = key_path_type + + __called: bool = False + + def __call__(self, value: _V_0, /) -> KeyPath[_V_0]: + KeyPath = self.__key_path_type + + self.__called = True + + try: + recorder = _thread_local.recorder + except AttributeError: + raise RuntimeError( + " ".join( + [ + "`KeyPath.of` must be accessed and then called immediatedly", + "and should NOT be called more than once.", + ] + ) + ) + + del _thread_local.recorder + + assert not recorder.busy + + start = recorder.start + key_list = recorder.key_list + if start is _MISSING: + assert len(key_list) == 0 + + raise RuntimeError("No key has been recorded.") + else: + assert len(key_list) > 0 + + if recorder.end is not value: + raise RuntimeError( + " ".join( + [ + "Key-path is broken. Check if there is something that does", + "NOT support key-paths in the member chain.", + ] + ) + ) + + key_path = KeyPath(start, key_list) + return key_path + + def __del__(self, /) -> None: + # ! If an exception had occured during the key-path access, or this function + # ! were just discarded without being finally called, we would do some cleaning + # ! here. + if not self.__called: + del _thread_local.recorder + + # Copy docstring so that it can be displayed upon calling `help(KeyPath.of)`. + _KeyPathOfFunction.__doc__ = of.__doc__ + + +class _Mixin(metaclass=_MixinMeta): + """ + A mixin class that allows `KeyPath` to be used in a sugarless form. + + Examples + -------- + >>> class A: + ... b: B + ... def __init__(self, /) -> None: + ... self.b = B() + + >>> class B: + ... c: int + ... def __init__(self, /) -> None: + ... self.c = 0 + + >>> a = A() + + >>> key_path = KeyPath.of(a.b.c) + + >>> assert key_path.base is a + + >>> assert key_path.keys == ("b", "c") + + """ + + +###### + + +class _ThreadLocalProtocol(Protocol): + recorder: _KeyPathRecorder + """ + The active key-path recorder for this thread. May not exist. + """ + + +_thread_local = cast("_ThreadLocalProtocol", threading.local()) + + +###### + + +@final +class _KeyPathRecorder: + __slots__ = ("busy", "start", "end", "key_list") + + busy: bool + start: Any + end: Any + key_list: list[str] + + def __init__(self, /) -> None: + self.busy = False + self.start = _MISSING + self.end = _MISSING + self.key_list = [] + + +###### + + +class KeyPathSupporting: + """ + A base class that supports key-paths. + + Examples + -------- + >>> class C(KeyPathSupporting): + ... v = 0 + >>> c = C() + >>> key_path = KeyPath.of(c.v) + >>> assert key_path.base is c + >>> assert key_path.keys == ("v",) + """ + + # ! This method is intentially not named as `__getattribute__`. See reason below. + def _(self, key: str, /) -> Any: + try: + recorder = _thread_local.recorder + except AttributeError: + # There is no recorder, which means that `KeyPath.of` is not being called. + # So we don't need to record this key. + return super().__getattribute__(key) + + if recorder.busy: + # The recorder is busy, which means that another member is being accessed, + # typically because the computation of that member is dependent on this one. + # So we don't need to record this key. + return super().__getattribute__(key) + + recorder.busy = True + + if recorder.start is not _MISSING and recorder.end is not self: + raise RuntimeError( + " ".join( + [ + "Key-path is broken. Check if there is something that does NOT", + "support key-paths in the member chain.", + ] + ) + ) + + value = super().__getattribute__(key) + + recorder.busy = False + if recorder.start is _MISSING: + recorder.start = self + recorder.end = value + recorder.key_list.append(key) + + return value + + # ! `__getattribute__(...)` is declared against `TYPE_CHECKING`, so that unknown + # ! attributes on subclasses won't be treated as known by type-checkers. + if not TYPE_CHECKING: + __getattribute__ = _ + + del _ + + +def key_path_supporting(clazz: type[_V_t], /) -> type[_V_t]: + """ + Patch on a class so that it can support key-paths. + + Examples + -------- + >>> @key_path_supporting + ... class C: + ... v = 0 + >>> c = C() + >>> key_path = KeyPath.of(c.v) + >>> assert key_path.base is c + >>> assert key_path.keys == ("v",) + """ + + old_getattribute = clazz.__getattribute__ + + def __getattribute__(self: _V_t, key: str) -> Any: + try: + recorder = _thread_local.recorder + except AttributeError: + # There is no recorder, which means that `KeyPath.of` is not being called. + # So we don't need to record this key. + return old_getattribute(self, key) + + if recorder.busy: + # The recorder is busy, which means that another member is being accessed, + # typically because the computation of that member is dependent on this one. + # So we don't need to record this key. + return old_getattribute(self, key) + + recorder.busy = True + + if recorder.start is not _MISSING and recorder.end is not self: + raise RuntimeError( + " ".join( + [ + "Key-path is broken. Check if there is something that does NOT", + "support key-paths in the member chain.", + ] + ) + ) + + value = old_getattribute(self, key) + + recorder.busy = False + if recorder.start is _MISSING: + recorder.start = self + recorder.end = value + recorder.key_list.append(key) + + return value + + clazz.__getattribute__ = __getattribute__ + + return clazz