Skip to content

Commit 3e561b4

Browse files
author
Вадим Козыревский
committed
Refactor style
1 parent c79072b commit 3e561b4

8 files changed

Lines changed: 195 additions & 76 deletions

File tree

docs/event_handler/best_practices.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
| **Limit concurrency** | Set appropriate `max_concurrent_event_handlers` based on resources | Resource management |
2424
| **Idempotency** | Make event handlers idempotent when possible | Reliability |
2525
| **Logging** | Log important events for debugging and monitoring | Observability |
26+
| **Follow-up events** | Use `handler.events` for multi-level chains; follow-ups run in the same pipeline (BFS or parallel) | Design clarity |
2627

2728
!!! warning "Performance Considerations"
2829
Event handlers execute synchronously in the request context. Keep them fast to avoid blocking the main request flow.
@@ -35,6 +36,7 @@ Event handling in `python-cqrs`:
3536

3637
- **Runtime Processing** — Events are processed synchronously in the same request context
3738
- **Automatic Dispatch** — Events are automatically dispatched to registered handlers
39+
- **Event Propagation** — Handlers can return follow-up events via `events`; they are processed in the same pipeline (BFS or parallel with semaphore)
3840
- **Parallel Support** — Multiple events can be processed in parallel with configurable limits
3941
- **Two Types** — DomainEvent (in-process) and NotificationEvent (message broker)
4042
- **Side Effects** — Event handlers perform side effects without blocking command execution

docs/event_handler/event_flow.md

Lines changed: 66 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,22 @@ sequenceDiagram
4141
4242
alt DomainEvent
4343
Emitter->>Handlers: 8. Execute Event Handlers
44-
Handlers-->>Emitter: 9. Complete
44+
Handlers-->>Emitter: 9. handler.events (follow-ups)
45+
Emitter-->>Processor: 10. Return follow-ups
46+
Note over Processor: Process follow-ups in same pipeline (BFS or parallel)
4547
else NotificationEvent
4648
Emitter->>Broker: 8. Send to Message Broker
4749
Broker-->>Emitter: 9. Complete
50+
Emitter-->>Processor: 10. No follow-ups
4851
end
4952
50-
Emitter-->>Processor: 10. Complete
5153
Processor-->>Mediator: 11. Complete
5254
Mediator-->>Client: 12. Return Response
5355
```
5456

57+
!!! note "Follow-up events"
58+
For domain events, handlers can return follow-up events via the `events` property. The processor continues emitting these in the same pipeline (sequential BFS or parallel with semaphore) until the queue is empty.
59+
5560
### Detailed Event Processing Flow
5661

5762
```mermaid
@@ -63,32 +68,35 @@ graph TD
6368
D -->|No| E[Return Response]
6469
D -->|Yes| F[EventProcessor.emit_events]
6570
66-
F -->|For Each Event| G{Parallel Enabled?}
71+
F -->|Queue: initial events| G{Parallel Enabled?}
72+
73+
G -->|No| H[Sequential: BFS]
74+
G -->|Yes| I[Parallel: Semaphore + FIRST_COMPLETED]
6775
68-
G -->|No| H[Sequential: EventEmitter.emit]
69-
G -->|Yes| I[Parallel: Create Task with Semaphore]
70-
I --> J[EventEmitter.emit]
76+
H --> J[Pop Event from Queue]
77+
I --> J
7178
72-
H --> K{Event Type?}
73-
J --> K
79+
J --> K[EventEmitter.emit]
80+
K --> L{Event Type?}
7481
75-
K -->|DomainEvent| L[EventEmitter: Find Handlers]
76-
K -->|NotificationEvent| M[EventEmitter: Send to Broker]
82+
L -->|DomainEvent| M[EventMap Lookup]
83+
L -->|NotificationEvent| N[Send to Broker]
7784
78-
L --> N[EventMap Lookup]
79-
N --> O[Resolve Handler from DI]
80-
O --> P[Execute Event Handler]
81-
P --> Q{More Events?}
85+
M --> O[Resolve Handler from DI]
86+
O --> P[Execute handler.handle]
87+
P --> Q[Collect handler.events - follow-ups]
88+
Q --> R[Return follow-ups to Processor]
8289
83-
M --> Q
84-
Q -->|Yes| F
85-
Q -->|No| E
90+
N --> R
91+
R --> S{More in Queue?}
92+
S -->|Yes| G
93+
S -->|No| E
8694
8795
style A fill:#e1f5ff
8896
style B fill:#fff3e0
8997
style F fill:#c8e6c9
90-
style L fill:#c8e6c9
9198
style P fill:#c8e6c9
99+
style Q fill:#fff9c4
92100
style E fill:#f3e5f5
93101
```
94102

@@ -134,7 +142,7 @@ The `EventProcessor` handles parallel or sequential processing based on configur
134142

135143
### 3. Event Processing via EventEmitter
136144

137-
Events are processed through `EventEmitter`, which routes them based on event type:
145+
Events are processed through `EventEmitter`, which routes them based on event type. For domain events, after each handler runs, follow-up events from `handler.events` are collected and returned; the processor then continues with these in the same pipeline (BFS in sequential mode, or under the same semaphore in parallel mode).
138146

139147
```mermaid
140148
graph TD
@@ -145,25 +153,54 @@ graph TD
145153
D -->|No| E[Log Warning]
146154
D -->|Yes| F[Loop Through Handlers]
147155
F -->|3. Resolve Handler| G[DI Container]
148-
G -->|4. Execute Handler| H[Handler.handle]
149-
H -->|5. Process Side Effects| I[Complete]
156+
G -->|4. Execute handler.handle| H[Handler.handle]
157+
H -->|5. Collect handler.events| I[Follow-up events]
158+
I --> J[Return follow-ups to Processor]
150159
151-
B -->|NotificationEvent| J{Message Broker?}
152-
J -->|No| K[Raise RuntimeError]
153-
J -->|Yes| L[Send to Message Broker]
154-
L --> I
160+
B -->|NotificationEvent| K{Message Broker?}
161+
K -->|No| L[Raise RuntimeError]
162+
K -->|Yes| M[Send to Message Broker]
163+
M --> N[Return empty - no follow-ups]
155164
156165
style A fill:#e1f5ff
157166
style H fill:#c8e6c9
158-
style I fill:#fff3e0
167+
style I fill:#fff9c4
168+
style J fill:#fff3e0
169+
```
170+
171+
### 3.1. Follow-up events from event handlers (event propagation)
172+
173+
Event handlers can produce **follow-up events** by implementing the `events` property. After `handle()` is called, the emitter reads `handler.events` and returns them to the processor. These follow-ups are processed in the **same pipeline**:
174+
175+
| Mode | Behavior |
176+
|------|----------|
177+
| **Sequential** (`concurrent_event_handle_enable=False`) | Events and follow-ups are processed in **BFS order**: one event at a time, then its follow-ups are appended to the queue. |
178+
| **Parallel** (`concurrent_event_handle_enable=True`) | Events are processed under a semaphore; as soon as any task completes, its follow-ups are queued and started (FIRST_COMPLETED), without waiting for sibling events. |
179+
180+
This allows **multi-level event chains**: e.g. `OrderCreated` → handler emits `InventoryReserved` → handler emits `NotificationScheduled`, all in one run.
181+
182+
Example: handler that produces a follow-up event:
183+
184+
```python
185+
class OrderCreatedEventHandler(cqrs.EventHandler[OrderCreatedEvent]):
186+
def __init__(self) -> None:
187+
self._follow_ups: list[cqrs.IEvent] = []
188+
189+
@property
190+
def events(self) -> typing.Sequence[cqrs.IEvent]:
191+
return tuple(self._follow_ups)
192+
193+
async def handle(self, event: OrderCreatedEvent) -> None:
194+
# Side effects...
195+
self._follow_ups.append(InventoryReservedEvent(order_id=event.order_id))
159196
```
160197

161198
### 4. Event Routing
162199

163200
`EventEmitter` automatically routes events based on their type:
164201

165-
- **DomainEvent** — Processed by event handlers registered in EventMap (in-process, synchronous)
166-
- **NotificationEvent** — Sent to message broker (Kafka, RabbitMQ, etc.) for asynchronous processing
202+
- **DomainEvent** — Processed by event handlers registered in EventMap (in-process). Handlers may return follow-up events via the `events` property; these are processed in the same pipeline (BFS or parallel with semaphore).
203+
- **NotificationEvent** — Sent to message broker (Kafka, RabbitMQ, etc.) for asynchronous processing; no follow-ups.
167204

168205
!!! important "Single Processing"
169-
Events are processed **only once** through EventEmitter. There is no duplicate processing - DomainEvents are handled by event handlers, and NotificationEvents are sent to message brokers.
206+
Each event instance is processed **only once** through EventEmitter. Follow-up events returned by handlers are **new** events that are then processed in the same run (same pipeline) until the queue is empty.

docs/event_handler/event_types.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
### DomainEvent
2424

25-
Domain events represent something that happened in the domain. They are processed by event handlers:
25+
Domain events represent something that happened in the domain. They are processed by event handlers. Handlers can return **follow-up events** via the `events` property; these are processed in the same pipeline (see [Event Flow](event_flow.md)).
2626

2727
```python
2828
class UserJoined(cqrs.DomainEvent, frozen=True):
@@ -32,6 +32,7 @@ class UserJoined(cqrs.DomainEvent, frozen=True):
3232
class UserJoinedEventHandler(cqrs.EventHandler[UserJoined]):
3333
async def handle(self, event: UserJoined) -> None:
3434
# Process domain event
35+
# Optionally populate self._follow_ups and return via events property
3536
...
3637
```
3738

docs/event_handler/examples.md

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,68 @@ await mediator.send(JoinMeetingCommand(user_id="123", meeting_id="456"))
7878
# Flow:
7979
# 1. Command handler executes
8080
# 2. UserJoined event is collected
81-
# 3. EventDispatcher processes event (finds UserJoinedEventHandler)
81+
# 3. EventProcessor.emit_events runs (EventEmitter finds UserJoinedEventHandler)
8282
# 4. UserJoinedEventHandler.handle() executes
8383
# 5. Response is returned
8484
```
85+
86+
---
87+
88+
## Event handler chain (follow-up events)
89+
90+
Event handlers can produce **follow-up events** via the `events` property. These are processed in the same pipeline (BFS in sequential mode, or under the same semaphore in parallel mode), enabling multi-level chains (e.g. L1 → L2 → L3).
91+
92+
```python
93+
import typing
94+
import cqrs
95+
from cqrs.events.event import IEvent
96+
97+
# Level 1: emitted by command handler
98+
class EventL1(cqrs.DomainEvent, frozen=True):
99+
seed: str
100+
101+
# Level 2: emitted by HandlerL1
102+
class EventL2(cqrs.DomainEvent, frozen=True):
103+
seed: str
104+
105+
# Level 3: emitted by HandlerL2 (terminal)
106+
class EventL3(cqrs.DomainEvent, frozen=True):
107+
seed: str
108+
109+
class HandlerL1(cqrs.EventHandler[EventL1]):
110+
def __init__(self) -> None:
111+
self._follow_ups: list[IEvent] = []
112+
113+
@property
114+
def events(self) -> typing.Sequence[IEvent]:
115+
return tuple(self._follow_ups)
116+
117+
async def handle(self, event: EventL1) -> None:
118+
# Side effects...
119+
self._follow_ups.append(EventL2(seed=event.seed))
120+
121+
class HandlerL2(cqrs.EventHandler[EventL2]):
122+
def __init__(self) -> None:
123+
self._follow_ups: list[IEvent] = []
124+
125+
@property
126+
def events(self) -> typing.Sequence[IEvent]:
127+
return tuple(self._follow_ups)
128+
129+
async def handle(self, event: EventL2) -> None:
130+
# Side effects...
131+
self._follow_ups.append(EventL3(seed=event.seed))
132+
133+
class HandlerL3(cqrs.EventHandler[EventL3]):
134+
async def handle(self, event: EventL3) -> None:
135+
# Terminal handler — no follow-ups (default events = ())
136+
pass
137+
138+
# In events_mapper:
139+
def domain_events_mapper(mapper: cqrs.EventMap) -> None:
140+
mapper.bind(EventL1, HandlerL1)
141+
mapper.bind(EventL2, HandlerL2)
142+
mapper.bind(EventL3, HandlerL3)
143+
```
144+
145+
When you emit `EventL1(seed="x")`, the processor runs: L1 → HandlerL1 emits L2 → HandlerL2 emits L3 → HandlerL3 runs. All in the same `emit_events()` call (sequential BFS or parallel with FIRST_COMPLETED).

docs/event_handler/index.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,13 @@
4444

4545
Event handlers process domain events that are emitted from command handlers. These events represent something that happened in the domain and trigger side effects like sending notifications, updating read models, or triggering other workflows.
4646

47-
When a command handler processes a request, it can emit domain events through the `events` property. These events are automatically collected and processed by event handlers registered in the system.
47+
When a command handler processes a request, it can emit domain events through the `events` property. These events are automatically collected and processed by event handlers registered in the system. **Event handlers** can in turn produce **follow-up events** via their own `events` property; these follow-ups are processed in the same pipeline (sequential BFS or parallel with semaphore), enabling multi-level event chains.
4848

4949
| Aspect | Description |
5050
|--------|-------------|
5151
| **Runtime Processing** | Events are processed synchronously in the same request context, not asynchronously |
5252
| **Automatic Dispatch** | Events are automatically dispatched to registered handlers after command execution |
53+
| **Event Propagation** | Handlers can return follow-up events via `events`; they are processed in the same run (BFS or parallel) |
5354
| **Parallel Support** | Multiple events can be processed in parallel with configurable concurrency limits |
5455
| **Side Effects** | Event handlers perform side effects without blocking the main command flow |
5556

docs/event_handler/parallel_processing.md

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,34 +22,39 @@ Events can be processed in parallel to improve performance. This is controlled b
2222

2323
### How Parallel Processing Works
2424

25+
In **sequential** mode, events and follow-ups (from `handler.events`) are processed in **BFS order**: one event at a time, then its follow-ups are appended to the queue. In **parallel** mode, events are processed under a semaphore; as soon as any task completes (FIRST_COMPLETED), its follow-up events are queued and started without waiting for sibling events. `emit_events` returns only when all events and follow-ups are done.
26+
2527
```mermaid
2628
graph TD
2729
Start[EventProcessor.emit_events] --> CheckEnable{Parallel Enabled?}
2830
29-
CheckEnable -->|No| Sequential[Sequential Processing]
30-
Sequential --> LoopSeq[For Each Event]
31-
LoopSeq --> EmitSeq[EventEmitter.emit]
31+
CheckEnable -->|No| Sequential[Sequential: BFS]
32+
Sequential --> PopSeq[Pop event from queue]
33+
PopSeq --> EmitSeq[EventEmitter.emit]
3234
EmitSeq --> RouteSeq{Route Event}
3335
RouteSeq -->|DomainEvent| HandlerSeq[Execute Handlers]
3436
RouteSeq -->|NotificationEvent| BrokerSeq[Send to Broker]
35-
HandlerSeq --> NextSeq{More Events?}
36-
BrokerSeq --> NextSeq
37-
NextSeq -->|Yes| LoopSeq
38-
NextSeq -->|No| End1[End]
37+
HandlerSeq --> CollectSeq[Collect handler.events]
38+
BrokerSeq --> CollectSeq
39+
CollectSeq --> ExtendSeq[Append follow-ups to queue]
40+
ExtendSeq --> MoreSeq{Queue empty?}
41+
MoreSeq -->|No| PopSeq
42+
MoreSeq -->|Yes| End1[End]
3943
40-
CheckEnable -->|Yes| Parallel[Parallel Processing]
41-
Parallel --> LoopPar[For Each Event]
42-
LoopPar --> CreateTask[Create Task]
43-
CreateTask --> Semaphore[Acquire Semaphore]
44+
CheckEnable -->|Yes| Parallel[Parallel: Semaphore + FIRST_COMPLETED]
45+
Parallel --> PopPar[Start tasks for queued events]
46+
PopPar --> Semaphore[Acquire Semaphore per task]
4447
Semaphore --> EmitPar[EventEmitter.emit]
4548
EmitPar --> RoutePar{Route Event}
4649
RoutePar -->|DomainEvent| HandlerPar[Execute Handlers]
4750
RoutePar -->|NotificationEvent| BrokerPar[Send to Broker]
48-
HandlerPar --> ReleaseSem[Release Semaphore]
49-
BrokerPar --> ReleaseSem
50-
ReleaseSem --> NextPar{More Events?}
51-
NextPar -->|Yes| LoopPar
52-
NextPar -->|No| End2[End]
51+
HandlerPar --> CollectPar[Collect follow-ups]
52+
BrokerPar --> CollectPar
53+
CollectPar --> QueuePar[Queue follow-ups, start new tasks]
54+
QueuePar --> WaitPar[Wait FIRST_COMPLETED]
55+
WaitPar --> MorePar{Pending or queue?}
56+
MorePar -->|Yes| PopPar
57+
MorePar -->|No| End2[End]
5358
5459
style Start fill:#e1f5ff
5560
style Sequential fill:#fff3e0
@@ -59,7 +64,7 @@ graph TD
5964

6065
### Implementation
6166

62-
The `EventProcessor` handles parallel or sequential event emission:
67+
The `EventProcessor` handles parallel or sequential event emission. Follow-up events returned by handlers (via `handler.events`) are processed in the **same pipeline**: BFS in sequential mode, or under the same semaphore with FIRST_COMPLETED in parallel mode. The method returns when all events and follow-ups are done.
6368

6469
```python
6570
class EventProcessor:
@@ -75,27 +80,29 @@ class EventProcessor:
7580
self._concurrent_event_handle_enable = concurrent_event_handle_enable
7681
self._event_semaphore = asyncio.Semaphore(max_concurrent_event_handlers)
7782

78-
async def emit_events(self, events: List[Event]) -> None:
79-
"""Emit events via event emitter (parallel or sequential)."""
83+
async def emit_events(self, events: Sequence[IEvent]) -> None:
84+
"""Emit events and process follow-ups in the same pipeline."""
8085
if not events or not self._event_emitter:
8186
return
8287

8388
if not self._concurrent_event_handle_enable:
84-
# Sequential processing
85-
for event in events:
86-
await self._event_emitter.emit(event)
89+
# Sequential: BFS over events and follow-ups
90+
to_process = deque(events)
91+
while to_process:
92+
event = to_process.popleft()
93+
follow_ups = await self._event_emitter.emit(event)
94+
to_process.extend(follow_ups)
8795
else:
88-
# Parallel processing with semaphore limit (fire-and-forget)
89-
for event in events:
90-
asyncio.create_task(self._emit_event_with_semaphore(event))
96+
# Parallel: tasks under semaphore; follow-ups queued on FIRST_COMPLETED
97+
await self._emit_events_parallel_first_completed(deque(events))
9198

92-
async def _emit_event_with_semaphore(self, event: Event) -> None:
93-
"""Emit a single event with semaphore limit."""
99+
async def _emit_one_event(self, event: IEvent) -> Sequence[IEvent]:
100+
"""Emit one event under semaphore; returns follow-ups from handler.events."""
94101
async with self._event_semaphore:
95-
await self._event_emitter.emit(event)
102+
return await self._event_emitter.emit(event)
96103
```
97104

98-
The `EventEmitter` then routes events to handlers or message brokers based on event type.
105+
The `EventEmitter.emit()` returns follow-up events from domain event handlers; the processor continues with these until the queue is empty.
99106

100107
### Configuration
101108

@@ -140,9 +147,10 @@ class ProcessOrderCommandHandler(RequestHandler[ProcessOrderCommand, None]):
140147
self._events.append(EmailNotificationEvent(...))
141148

142149
# With max_concurrent_event_handlers=3:
143-
# - Events 1-3 emit in parallel (fire-and-forget tasks)
144-
# - Event 4 waits for a semaphore slot
150+
# - Up to 3 events (or follow-ups) run at once under the semaphore
151+
# - When any task completes, its follow-ups (from handler.events) are queued and started (FIRST_COMPLETED)
152+
# - emit_events() returns only when all events and follow-ups are done
145153
# - Each event is routed by EventEmitter:
146-
# - DomainEvents → processed by handlers
147-
# - NotificationEvents → sent to message broker
154+
# - DomainEvents → processed by handlers (follow-ups collected and processed in same pipeline)
155+
# - NotificationEvents → sent to message broker (no follow-ups)
148156
```

0 commit comments

Comments
 (0)