Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 188 additions & 0 deletions docs/listeners.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,194 @@ Paulista Avenue after: red--(cycle)-->green
```


## Class-level listener declarations

```{versionadded} 3.0.0
```

You can declare listeners at the class level so they are automatically attached to every
instance of the state machine. This is useful for cross-cutting concerns like logging,
persistence, or telemetry that should always be present.

The `listeners` class attribute accepts two forms:

- **Callable** (class, `functools.partial`, lambda): acts as a factory — called once per
SM instance to produce a fresh listener. Use this for listeners that accumulate state.
- **Instance** (pre-built object): shared across all SM instances. Use this for stateless
listeners like a global logger.

```py
>>> from statemachine import State, StateChart

>>> class AuditListener:
... def __init__(self):
... self.log = []
...
... def after_transition(self, event, source, target):
... self.log.append(f"{event}: {source.id} -> {target.id}")

>>> class OrderMachine(StateChart):
... listeners = [AuditListener]
...
... draft = State(initial=True)
... confirmed = State(final=True)
... confirm = draft.to(confirmed)

>>> sm = OrderMachine()
>>> sm.send("confirm")
>>> [type(l).__name__ for l in sm.active_listeners]
['AuditListener']

>>> sm.active_listeners[0].log
['confirm: draft -> confirmed']

```

### Listeners with configuration

Use `functools.partial` to pass configuration to listener factories:

```py
>>> from functools import partial

>>> class HistoryListener:
... def __init__(self, max_size=50):
... self.max_size = max_size
... self.entries = []
...
... def after_transition(self, event, source, target):
... self.entries.append(f"{source.id} -> {target.id}")
... if len(self.entries) > self.max_size:
... self.entries.pop(0)

>>> class TrackedMachine(StateChart):
... listeners = [partial(HistoryListener, max_size=10)]
...
... s1 = State(initial=True)
... s2 = State(final=True)
... go = s1.to(s2)

>>> sm = TrackedMachine()
>>> sm.send("go")
>>> sm.active_listeners[0].entries
['s1 -> s2']

```

### Runtime listeners merge with class-level

Runtime listeners passed via the `listeners=` constructor parameter are appended after
class-level listeners:

```py
>>> runtime_listener = AuditListener()
>>> sm = OrderMachine(listeners=[runtime_listener])
>>> sm.send("confirm")
>>> [type(l).__name__ for l in sm.active_listeners]
['AuditListener', 'AuditListener']

>>> runtime_listener.log
['confirm: draft -> confirmed']

```

### Inheritance

Child class listeners are appended after parent listeners. The full MRO chain is respected:

```py
>>> class LogListener:
... pass

>>> class BaseMachine(StateChart):
... listeners = [LogListener]
...
... s1 = State(initial=True)
... s2 = State(final=True)
... go = s1.to(s2)

>>> class ChildMachine(BaseMachine):
... listeners = [AuditListener]

>>> sm = ChildMachine()
>>> [type(l).__name__ for l in sm.active_listeners]
['LogListener', 'AuditListener']

```

To **replace** parent listeners instead of extending, set `listeners_inherit = False`:

```py
>>> class ReplacedMachine(BaseMachine):
... listeners_inherit = False
... listeners = [AuditListener]

>>> sm = ReplacedMachine()
>>> [type(l).__name__ for l in sm.active_listeners]
['AuditListener']

```

### Listener `setup()` protocol

Listeners that need runtime dependencies (e.g., a database session, Redis client) can
define a `setup()` method. It is called during SM `__init__` with the SM instance and
any extra `**kwargs` passed to the constructor. The {ref}`dynamic-dispatch` mechanism
ensures each listener receives only the kwargs it declares:

```py
>>> class DBListener:
... def __init__(self):
... self.session = None
...
... def setup(self, sm, session=None, **kwargs):
... self.session = session

>>> class PersistentMachine(StateChart):
... listeners = [DBListener]
...
... s1 = State(initial=True)
... s2 = State(final=True)
... go = s1.to(s2)

>>> sm = PersistentMachine(session="my_db_session")
>>> sm.active_listeners[0].session
'my_db_session'

```

Multiple listeners with different dependencies compose naturally — each `setup()` picks
only the kwargs it needs:

```py
>>> class CacheListener:
... def __init__(self):
... self.redis = None
...
... def setup(self, sm, redis=None, **kwargs):
... self.redis = redis

>>> class FullMachine(StateChart):
... listeners = [DBListener, CacheListener]
...
... s1 = State(initial=True)
... s2 = State(final=True)
... go = s1.to(s2)

>>> sm = FullMachine(session="db_conn", redis="redis_conn")
>>> sm.active_listeners[0].session
'db_conn'
>>> sm.active_listeners[1].redis
'redis_conn'

```

```{note}
The `setup()` method is only called on **factory-created** instances (callable entries).
Shared instances (pre-built objects) do not receive `setup()` calls — they are assumed
to be already configured by whoever created them.
```

```{hint}
The `StateChart` itself is registered as a listener, so by using `listeners` an
external object can have the same level of functionalities provided to the built-in class.
Expand Down
15 changes: 15 additions & 0 deletions docs/releases/3.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,21 @@ class GameCharacter(StateChart):
See {ref}`weighted-transitions` for full documentation.


### Class-level listener declarations

Listeners can now be declared at the class level using the `listeners` attribute, so they are
automatically attached to every instance. The list accepts callables (classes, `partial`, lambdas)
as factories that create a fresh listener per instance, or pre-built instances that are shared.

A `setup()` protocol allows factory-created listeners to receive runtime dependencies
(DB sessions, Redis clients, etc.) via `**kwargs` forwarded from the SM constructor.

Inheritance is supported: child listeners are appended after parent listeners, unless
`listeners_inherit = False` is set to replace them entirely.

See {ref}`observers` for full documentation.


### Async concurrent event result routing

When multiple coroutines send events concurrently via `asyncio.gather`, each
Expand Down
22 changes: 22 additions & 0 deletions statemachine/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(
)
cls.add_inherited(bases)
cls.add_from_attributes(attrs)
cls._collect_class_listeners(attrs, bases)
cls._unpack_builders_callbacks()
cls._update_event_references()

Expand Down Expand Up @@ -233,6 +234,27 @@ def _setup(cls):
"send",
} | {s.id for s in cls.states}

def _collect_class_listeners(cls, attrs: Dict[str, Any], bases: Tuple[type]):
"""Collect class-level listener declarations from attrs and MRO.

Listeners declared on parent classes are prepended (MRO order),
unless the child sets ``listeners_inherit = False``.
"""
class_listeners: List[Any] = []
if attrs.get("listeners_inherit", True):
for base in reversed(bases):
class_listeners.extend(getattr(base, "_class_listeners", []))
for entry in attrs.get("listeners", []):
if entry is None or isinstance(entry, (str, int, float, bool)):
raise InvalidDefinition(
_(
"Invalid entry in 'listeners': {!r}. "
"Expected a class, callable, or listener instance."
).format(entry)
)
class_listeners.append(entry)
cls._class_listeners: List[Any] = class_listeners

def add_inherited(cls, bases):
for base in bases:
for state in getattr(base, "states", []):
Expand Down
42 changes: 39 additions & 3 deletions statemachine/statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .graph import iterate_states_and_transitions
from .i18n import _
from .model import Model
from .signature import SignatureAdapter
from .utils import run_async_from_sync

if TYPE_CHECKING:
Expand Down Expand Up @@ -129,6 +130,7 @@ class StateChart(Generic[TModel], metaclass=StateMachineMetaclass):
_events: "Dict[Event, None]"
_protected_attrs: set
_specs: CallbackSpecList
_class_listeners: List[Any]
prepare: SpecListGrouper

def __init__(
Expand All @@ -137,6 +139,7 @@ def __init__(
state_field: str = "state",
start_value: Any = None,
listeners: "List[object] | None" = None,
**kwargs: Any,
):
self.model: TModel = model if model is not None else Model() # type: ignore[assignment]
self.history_values: Dict[
Expand All @@ -154,7 +157,9 @@ def __init__(
if self._abstract:
raise InvalidDefinition(_("There are no states or transitions."))

self._register_callbacks(listeners or [])
class_listener_instances = self._resolve_class_listeners(**kwargs)
all_listeners = class_listener_instances + (listeners or [])
self._register_callbacks(all_listeners)

# Activate the initial state, this only works if the outer scope is sync code.
# for async code, the user should manually call `await sm.activate_initial_state()`
Expand All @@ -168,6 +173,26 @@ def _get_engine(self):

return SyncEngine(self)

def _resolve_class_listeners(self, **kwargs: Any) -> List[object]:
resolved: List[object] = []
for entry in self._class_listeners:
if callable(entry):
instance = entry()
setup = getattr(instance, "setup", None)
if setup is not None:
sig = SignatureAdapter.from_callable(setup)
ba = sig.bind_expected(self, **kwargs)
try:
setup(*ba.args, **ba.kwargs)
except TypeError as err:
raise TypeError(
f"Error calling setup() on listener {type(instance).__name__}: {err}"
) from err
else:
instance = entry
resolved.append(instance)
return resolved

def activate_initial_state(self) -> Any:
result = self._engine.activate_initial_state()
if not isawaitable(result):
Expand Down Expand Up @@ -199,11 +224,13 @@ def __setstate__(self, state: Dict[str, Any]) -> None:
self.__dict__.update(state) # type: ignore[attr-defined]
self._callbacks = CallbacksRegistry()
self._states_for_instance = {}

self._listeners = {}

# _listeners already contained both class-level and runtime listeners
# when serialized, so just re-register them all.
self._register_callbacks([])
self.add_listener(*listeners.values())
if listeners:
self.add_listener(*listeners.values())
self._engine = self._get_engine()
self._engine.start()

Expand Down Expand Up @@ -268,6 +295,15 @@ def _register_callbacks(self, listeners: List[object]):

self._callbacks.async_or_sync()

@property
def active_listeners(self) -> List[object]:
"""List of all active listeners attached to this instance.

Includes class-level listeners (resolved from the ``listeners`` class attribute),
constructor ``listeners=`` parameter, and any added via :meth:`add_listener`.
"""
return list(self._listeners.values())

def add_listener(self, *listeners):
"""Add a listener.

Expand Down
Loading
Loading