Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 75 additions & 51 deletions observed.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def bar(self, x):

import weakref
import functools
from typing import Any, Callable, Dict, Optional, Tuple, Union

__version__ = "0.5.3"

Expand All @@ -87,15 +88,20 @@ class ObserverFunction:
does not prevent garbage collection of the observing function.
"""

def __init__(self, func, identify_observed, weakref_info):
def __init__(
self,
func: Callable[..., Any],
identify_observed: bool,
weakref_info: Tuple[Any, Dict[Any, Any]],
) -> None:
"""Initialize an ObserverFunction.

Args:
func: function I wrap. I call this function when I am called.
identify_observed: boolean indicating whether or not I will pass
the observed object as the first argument to the function I
wrap. True means pass the observed object, False means do not
pass the observed objec.
pass the observed object.
weakref_info: Tuple of (key, dict) where dict is the dictionary
which is keeping track of my role as an observer and key is
the key in that dict which maps to me. When the function I wrap
Expand All @@ -107,11 +113,11 @@ def __init__(self, func, identify_observed, weakref_info):
# weak reference to func, the call to weakref.ref returns a function
# instead of a weak ref. So, don't move the next line chomp, chomp...
functools.update_wrapper(self, func)
self.identify_observed = identify_observed
self.identify_observed: bool = identify_observed
key, d = weakref_info
self.func_wr = weakref.ref(func, CleanupHandler(key, d))
self.func_wr: weakref.ref = weakref.ref(func, CleanupHandler(key, d))

def __call__(self, observed_obj, *arg, **kw):
def __call__(self, observed_obj: Any, *arg: Any, **kw: Any) -> Any:
"""Call the function I wrap.

Args:
Expand All @@ -124,9 +130,9 @@ def __call__(self, observed_obj, *arg, **kw):
"""

if self.identify_observed:
return self.func_wr()(observed_obj, *arg, **kw)
return self.func_wr()(observed_obj, *arg, **kw) # type: ignore[union-attr]
else:
return self.func_wr()(*arg, **kw)
return self.func_wr()(*arg, **kw) # type: ignore[union-attr]


class ObserverBoundMethod:
Expand All @@ -136,7 +142,13 @@ class ObserverBoundMethod:
being an observer does not prevent garbage collection of that instance.
"""

def __init__(self, inst, method_name, identify_observed, weakref_info):
def __init__(
self,
inst: Any,
method_name: str,
identify_observed: bool,
weakref_info: Tuple[Any, Dict[Any, Any]],
) -> None:
"""Initialize an ObserverBoundMethod.

Args:
Expand All @@ -145,20 +157,20 @@ def __init__(self, inst, method_name, identify_observed, weakref_info):
identify_observed: boolean indicating whether or not I will pass
the observed object as the first argument to the function I
wrap. True means pass the observed object, False means do not
pass the observed objec.
pass the observed object.
weakref_info: Tuple of (key, dict) where dict is the dictionary
which is keeping track of my role as an observer and key is
the key in that dict which maps to me. When the function I wrap
is finalized, I use this information to delete myself from the
dictionary.
"""

self.identify_observed = identify_observed
self.identify_observed: bool = identify_observed
key, d = weakref_info
self.inst = weakref.ref(inst, CleanupHandler(key, d))
self.method_name = method_name
self.inst: weakref.ref = weakref.ref(inst, CleanupHandler(key, d))
self.method_name: str = method_name

def __call__(self, observed_obj, *arg, **kw):
def __call__(self, observed_obj: Any, *arg: Any, **kw: Any) -> Any:
"""Call the function I wrap.

Args:
Expand Down Expand Up @@ -201,18 +213,20 @@ class ObservableFunction:
proper notion of equality.
"""

def __init__(self, func):
def __init__(self, func: Callable[..., Any]) -> None:
"""Initialize an ObservableFunction.

Args:
func: The function I wrap.
"""

functools.update_wrapper(self, func)
self.func = func
self.observers = {} # observer key -> observer
self.func: Callable[..., Any] = func
self.observers: Dict[Any, Any] = {} # observer key -> observer

def add_observer(self, observer, identify_observed=False):
def add_observer(
self, observer: Callable[..., Any], identify_observed: bool = False
) -> bool:
"""Register an observer to observe me.

Args:
Expand Down Expand Up @@ -243,7 +257,7 @@ def add_observer(self, observer, identify_observed=False):
result = self._add_function(observer, identify_observed)
return result

def _add_function(self, func, identify_observed):
def _add_function(self, func: Callable[..., Any], identify_observed: bool) -> bool:
"""Add a function as an observer.

Args:
Expand All @@ -262,7 +276,7 @@ def _add_function(self, func, identify_observed):
else:
return False

def _add_bound_method(self, bound_method, identify_observed):
def _add_bound_method(self, bound_method: Callable[..., Any], identify_observed: bool) -> bool:
"""Add an bound method as an observer.

Args:
Expand All @@ -283,7 +297,7 @@ def _add_bound_method(self, bound_method, identify_observed):
else:
return False

def discard_observer(self, observer):
def discard_observer(self, observer: Callable[..., Any]) -> bool:
"""Un-register an observer.

Args:
Expand All @@ -299,7 +313,7 @@ def discard_observer(self, observer):
return discarded

@staticmethod
def make_key(observer):
def make_key(observer: Callable[..., Any]) -> Any:
"""Construct a unique, hashable, immutable key for an observer."""

if hasattr(observer, "__self__"):
Expand All @@ -310,7 +324,7 @@ def make_key(observer):
key = id(observer)
return key

def __call__(self, *arg, **kw):
def __call__(self, *arg: Any, **kw: Any) -> Any:
"""Invoke the callable which I proxy, and all of my observers.

The observers are called with the same *args and **kw as the main
Expand Down Expand Up @@ -338,7 +352,9 @@ def __call__(self, *arg, **kw):
class ObservableBoundMethod(ObservableFunction):
"""I wrap a bound method and allow observers to be registered."""

def __init__(self, func, inst, observers):
def __init__(
self, func: Callable[..., Any], inst: Any, observers: Dict[Any, Any]
) -> None:
"""Initialize an ObservableBoundMethod.

Args:
Expand All @@ -354,12 +370,12 @@ def __init__(self, func, inst, observers):
you probably grok this module.
"""

self.func = func
self.func: Callable[..., Any] = func
functools.update_wrapper(self, func)
self.inst = inst
self.observers = observers
self.inst: Any = inst
self.observers: Dict[Any, Any] = observers

def __call__(self, *arg, **kw):
def __call__(self, *arg: Any, **kw: Any) -> Any:
"""Invoke the bound method I wrap, and all of my observers.

The observers are called with the same *args and **kw as the bound
Expand All @@ -384,15 +400,15 @@ def __call__(self, *arg, **kw):
self.observers[key](self, *arg, **kw)
return result

def __eq__(self, other):
def __eq__(self, other: object) -> bool:
"""Check equality of this bound method with another."""

return all((
self.inst == other.inst,
self.func == other.func))

@property
def __self__(self):
def __self__(self) -> Any:
"""The instance to which I'm bound."""

return self.inst
Expand Down Expand Up @@ -431,17 +447,19 @@ class ObservableMethodManager_PersistOnInstances:
instance and return it.
"""

def __init__(self, func):
def __init__(self, func: Callable[..., Any]) -> None:
"""Initialize an ObservableMethodManager_PersistOnInstances.

Args:
func: the function (i.e.unbound method) I manage.
func: the function (i.e. unbound method) I manage.
"""

self._func = func
self._unbound_method = ObservableUnboundMethod(self)
self._func: Callable[..., Any] = func
self._unbound_method: ObservableUnboundMethod = ObservableUnboundMethod(self)

def __get__(self, inst, cls):
def __get__(
self, inst: Optional[Any], cls: Optional[type] = None
) -> Union[ObservableBoundMethod, "ObservableUnboundMethod"]:
"""Return an ObservableBoundMethod or ObservableUnboundMethod.

If accessed by instance, I return an ObservableBoundMethod which
Expand All @@ -465,7 +483,7 @@ def __get__(self, inst, cls):
observers = d.setdefault(self._func.__name__, {})
return ObservableBoundMethod(self._func, inst, observers)

def __set__(self, inst, val):
def __set__(self, inst: Any, val: Any) -> None:
"""Disallow setting because we don't guarantee behavior."""

raise RuntimeError("Assignment not supported")
Expand Down Expand Up @@ -500,17 +518,19 @@ class ObservableMethodManager_PersistOnDescriptor:
# instances themselves, which is done by
# ObservableMethodManager_PersistOnInstances.

def __init__(self, func):
def __init__(self, func: Callable[..., Any]) -> None:
"""Initialize an ObservableMethodManager_PersistOnDescriptor.

func is the function I will give to the ObservableBoundMethods I create.
"""
self._func = func
self._unbound_method = ObservableUnboundMethod(self)
self._func: Callable[..., Any] = func
self._unbound_method: ObservableUnboundMethod = ObservableUnboundMethod(self)
# instance id -> (inst weak ref, observers)
self.instances = {}
self.instances: Dict[int, Tuple[weakref.ref, Dict[Any, Any]]] = {}

def __get__(self, inst, cls):
def __get__(
self, inst: Optional[Any], cls: Optional[type] = None
) -> Union[ObservableBoundMethod, "ObservableUnboundMethod"]:
"""Return an ObservableBoundMethod or ObservableUnboundMethod.

If accessed by instance I return an ObservableBoundMethod which handles
Expand Down Expand Up @@ -540,25 +560,25 @@ def __get__(self, inst, cls):
self.instances[inst_id] = (wr, observers)
return ObservableBoundMethod(self._func, inst, observers)

def __set__(self, inst, val):
def __set__(self, inst: Any, val: Any) -> None:
"""Disallow setting because we don't guarantee behavior."""
raise RuntimeError("Assignment not supported")


class ObservableUnboundMethod:
"""Wrapper for an unbound version of an observable method."""

def __init__(self, manager):
def __init__(self, manager: Any) -> None:
""" Create an ObservableUnboundMethod.

Args:
manager: the descriptor in charge of this method. See
ObservableMethodManager.
"""
self._manager = manager
self._manager: Any = manager
functools.update_wrapper(self, manager._func)

def __call__(self, obj, *arg, **kw):
def __call__(self, obj: Any, *arg: Any, **kw: Any) -> Any:
""" Call the unbound method.

We essentially build a bound method and call that. This ensures that
Expand All @@ -575,17 +595,17 @@ class CleanupHandler:
Use me as a weakref.ref callback to remove an object's id from a dict when
that object is garbage collected.
"""
def __init__(self, key, d):
def __init__(self, key: Any, d: Dict[Any, Any]) -> None:
""" Initialize a cleanup handler.

Args:
key: the key we will delete.
d: the dict from which we will delete it.
"""
self.key = key
self.d = d
self.key: Any = key
self.d: Dict[Any, Any] = d

def __call__(self, wr):
def __call__(self, wr: weakref.ref) -> None:
"""Remove an entry from the dict.

When a weak ref's object expires, the CleanupHandler is called, which
Expand All @@ -598,7 +618,7 @@ def __call__(self, wr):
del self.d[self.key]


def observable_function(func):
def observable_function(func: Callable[..., Any]) -> ObservableFunction:
"""Decorate a function to make it observable.

Use me as a decorator on a function, like this:
Expand Down Expand Up @@ -630,7 +650,9 @@ def bar(self, x):
return ObservableFunction(func)


def get_observable_method(func, strategy):
def get_observable_method(
func: Callable[..., Any], strategy: str
) -> Union[ObservableMethodManager_PersistOnInstances, ObservableMethodManager_PersistOnDescriptor]:
"""Decorate a method to make it observable.

You can use me as a decorator on a method, like this:
Expand Down Expand Up @@ -704,5 +726,7 @@ def observer(x):
raise ValueError(f"Strategy {strategy} not recognized")


def observable_method(strategy='instances'):
def observable_method(
strategy: str = 'instances'
) -> Callable[[Callable[..., Any]], Union[ObservableMethodManager_PersistOnInstances, ObservableMethodManager_PersistOnDescriptor]]:
return lambda func: get_observable_method(func, strategy=strategy)