Skip to content

Commit 95aed36

Browse files
vadikko2Вадим Козыревский
andauthored
[Feature] Fallbacks for requests and events handling (#64)
* Add Fallback for requests and events * fixes after review * add type checking --------- Co-authored-by: Вадим Козыревский <v.kozyrevskiy@timeweb.ru>
1 parent bf46947 commit 95aed36

28 files changed

Lines changed: 1980 additions & 125 deletions

examples/cor_request_fallback.py

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
"""
2+
Example: Chain of Responsibility with Request Handler Fallback
3+
4+
This example shows how to combine a COR (Chain of Responsibility) handler
5+
with RequestHandlerFallback. The primary handler is a RequestHandler that
6+
delegates to a COR chain; when the chain raises (e.g. downstream failure),
7+
the fallback handler is invoked.
8+
9+
Use case: A request is first tried through a chain of handlers (e.g. try
10+
cache, then DB, then external API). If the whole chain fails (e.g. connection
11+
error), a fallback handler returns a default/cached response.
12+
13+
================================================================================
14+
HOW TO RUN THIS EXAMPLE
15+
================================================================================
16+
17+
Run the example:
18+
python examples/cor_request_fallback.py
19+
20+
The example will:
21+
- Send a command that is handled by a COR chain (primary path)
22+
- For source="error", the chain raises and fallback handler runs
23+
- For source="a" or "b", the chain handles the request successfully
24+
25+
================================================================================
26+
WHAT THIS EXAMPLE DEMONSTRATES
27+
================================================================================
28+
29+
1. RequestHandlerFallback with COR as primary:
30+
- Primary is a RequestHandler that delegates to a COR chain (injected via DI).
31+
- Fallback is a simple RequestHandler used when the chain raises.
32+
33+
2. Building the chain:
34+
- Create COR handler instances, build_chain(), then bind the chain entry
35+
(first handler) in the container so the wrapper can receive it.
36+
37+
3. Flow:
38+
- mediator.send(request) dispatches to primary (CORChainWrapperHandler).
39+
- Wrapper calls the chain; if the chain raises, dispatcher catches and
40+
invokes fallback.
41+
42+
4. Optional failure_exceptions:
43+
- Restrict fallback to specific exception types (e.g. ConnectionError).
44+
45+
================================================================================
46+
REQUIREMENTS
47+
================================================================================
48+
49+
Make sure you have installed:
50+
- cqrs (this package)
51+
- di (dependency injection)
52+
53+
================================================================================
54+
"""
55+
56+
import asyncio
57+
import logging
58+
59+
import di
60+
from di import dependent
61+
62+
import cqrs
63+
from cqrs.requests import bootstrap
64+
from cqrs.requests.cor_request_handler import (
65+
CORRequestHandler,
66+
build_chain,
67+
)
68+
69+
logging.basicConfig(level=logging.INFO)
70+
logger = logging.getLogger(__name__)
71+
72+
HANDLER_SOURCE: list[str] = [] # "chain" or "fallback"
73+
74+
75+
# -----------------------------------------------------------------------------
76+
# Command and response
77+
# -----------------------------------------------------------------------------
78+
79+
80+
class FetchDataCommand(cqrs.Request):
81+
source: str # "a" | "b" | "error"
82+
83+
84+
class FetchDataResult(cqrs.Response):
85+
data: str
86+
source: str # "chain" or "fallback"
87+
88+
89+
# -----------------------------------------------------------------------------
90+
# COR handlers (chain)
91+
# -----------------------------------------------------------------------------
92+
93+
94+
class SourceAHandler(CORRequestHandler[FetchDataCommand, FetchDataResult]):
95+
@property
96+
def events(self) -> list[cqrs.Event]:
97+
return []
98+
99+
async def handle(self, request: FetchDataCommand) -> FetchDataResult | None:
100+
if request.source == "a":
101+
logger.info("COR chain: SourceAHandler handled source=a")
102+
HANDLER_SOURCE.append("chain")
103+
return FetchDataResult(data="data_from_a", source="chain")
104+
return await self.next(request)
105+
106+
107+
class SourceBHandler(CORRequestHandler[FetchDataCommand, FetchDataResult]):
108+
@property
109+
def events(self) -> list[cqrs.Event]:
110+
return []
111+
112+
async def handle(self, request: FetchDataCommand) -> FetchDataResult | None:
113+
if request.source == "b":
114+
logger.info("COR chain: SourceBHandler handled source=b")
115+
HANDLER_SOURCE.append("chain")
116+
return FetchDataResult(data="data_from_b", source="chain")
117+
return await self.next(request)
118+
119+
120+
class DefaultChainHandler(CORRequestHandler[FetchDataCommand, FetchDataResult]):
121+
"""Last in chain: handles unknown or raises for source='error'."""
122+
123+
@property
124+
def events(self) -> list[cqrs.Event]:
125+
return []
126+
127+
async def handle(self, request: FetchDataCommand) -> FetchDataResult | None:
128+
if request.source == "error":
129+
logger.info("COR chain: DefaultChainHandler raising ConnectionError for source=error")
130+
raise ConnectionError("Downstream service unavailable")
131+
logger.info("COR chain: DefaultChainHandler handled (unknown source)")
132+
HANDLER_SOURCE.append("chain")
133+
return FetchDataResult(data="default_data", source="chain")
134+
135+
136+
# -----------------------------------------------------------------------------
137+
# Wrapper: RequestHandler that delegates to the COR chain
138+
# -----------------------------------------------------------------------------
139+
140+
141+
class CORChainWrapperHandler(
142+
cqrs.RequestHandler[FetchDataCommand, FetchDataResult],
143+
):
144+
"""Primary 'handler' that runs the COR chain; chain is injected as the first link."""
145+
146+
def __init__(self, chain_entry: SourceAHandler) -> None:
147+
self._chain_entry = chain_entry
148+
149+
@property
150+
def events(self) -> list[cqrs.Event]:
151+
return []
152+
153+
async def handle(self, request: FetchDataCommand) -> FetchDataResult:
154+
result = await self._chain_entry.handle(request)
155+
if result is None:
156+
raise ValueError("COR chain did not handle the request")
157+
return result
158+
159+
160+
# -----------------------------------------------------------------------------
161+
# Fallback handler (used when the chain raises)
162+
# -----------------------------------------------------------------------------
163+
164+
165+
class FallbackFetchDataHandler(
166+
cqrs.RequestHandler[FetchDataCommand, FetchDataResult],
167+
):
168+
@property
169+
def events(self) -> list[cqrs.Event]:
170+
return []
171+
172+
async def handle(self, request: FetchDataCommand) -> FetchDataResult:
173+
logger.info("Fallback handler: returning cached/default for source=%s", request.source)
174+
HANDLER_SOURCE.append("fallback")
175+
return FetchDataResult(
176+
data="cached_or_default",
177+
source="fallback",
178+
)
179+
180+
181+
# -----------------------------------------------------------------------------
182+
# Mappers and bootstrap
183+
# -----------------------------------------------------------------------------
184+
185+
186+
def commands_mapper(mapper: cqrs.RequestMap) -> None:
187+
mapper.bind(
188+
FetchDataCommand,
189+
cqrs.RequestHandlerFallback(
190+
primary=CORChainWrapperHandler,
191+
fallback=FallbackFetchDataHandler,
192+
failure_exceptions=(ConnectionError, TimeoutError),
193+
),
194+
)
195+
196+
197+
async def main() -> None:
198+
HANDLER_SOURCE.clear()
199+
200+
# Build COR chain and inject the chain entry so CORChainWrapperHandler gets it
201+
source_a = SourceAHandler()
202+
source_b = SourceBHandler()
203+
default = DefaultChainHandler()
204+
build_chain([source_a, source_b, default])
205+
206+
di_container = di.Container()
207+
di_container.bind(
208+
di.bind_by_type(
209+
dependent.Dependent(lambda: source_a, scope="request"),
210+
SourceAHandler,
211+
),
212+
)
213+
214+
mediator = bootstrap.bootstrap(
215+
di_container=di_container,
216+
commands_mapper=commands_mapper,
217+
)
218+
219+
print("\n" + "=" * 60)
220+
print("COR REQUEST HANDLER FALLBACK EXAMPLE")
221+
print("=" * 60)
222+
223+
# Case 1: chain handles (source=a)
224+
print("\n1. Send FetchDataCommand(source='a') — chain handles")
225+
result1: FetchDataResult = await mediator.send(FetchDataCommand(source="a"))
226+
print(f" Result: data={result1.data}, source={result1.source}")
227+
assert result1.source == "chain" and result1.data == "data_from_a"
228+
229+
# Case 2: chain handles (source=b)
230+
print("\n2. Send FetchDataCommand(source='b') — chain handles")
231+
result2: FetchDataResult = await mediator.send(FetchDataCommand(source="b"))
232+
print(f" Result: data={result2.data}, source={result2.source}")
233+
assert result2.source == "chain" and result2.data == "data_from_b"
234+
235+
# Case 3: chain raises (source=error) -> fallback runs
236+
print("\n3. Send FetchDataCommand(source='error') — chain raises, fallback runs")
237+
result3: FetchDataResult = await mediator.send(FetchDataCommand(source="error"))
238+
print(f" Result: data={result3.data}, source={result3.source}")
239+
assert result3.source == "fallback" and result3.data == "cached_or_default"
240+
241+
print("\n Handlers that ran (in order): " + str(HANDLER_SOURCE))
242+
assert "chain" in HANDLER_SOURCE and "fallback" in HANDLER_SOURCE
243+
print("\n" + "=" * 60 + "\n")
244+
245+
246+
if __name__ == "__main__":
247+
asyncio.run(main())

0 commit comments

Comments
 (0)