Skip to content

Commit 556f13e

Browse files
author
Вадим Козыревский
committed
Update documentation
1 parent ded9d68 commit 556f13e

3 files changed

Lines changed: 70 additions & 13 deletions

File tree

docs/index.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
</div>
2222

23+
!!! warning "Breaking Changes in v5.0.0"
24+
Starting with version 5.0.0, **Pydantic support will become optional**. The default implementations of `Request`, `Response`, `DomainEvent`, and `NotificationEvent` will be migrated to dataclasses-based implementations.
25+
2326
---
2427

2528
## Core Features

docs/saga/recovery.md

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,25 @@
1010

1111
</div>
1212

13-
Recovery ensures eventual consistency by resuming interrupted sagas from persistent storage, guaranteeing all sagas eventually reach a terminal state (COMPLETED or FAILED).
13+
Recovery ensures eventual consistency by resuming interrupted sagas from persistent storage, guaranteeing all sagas eventually reach a terminal state (COMPLETED or FAILED). Recovery **attempts** are tracked per saga so you can limit retries and exclude persistently failing sagas.
1414

1515
## Overview
1616

1717
Sagas can be interrupted due to server crashes, network timeouts, or system overload. Recovery solves this by:
1818

1919
1. Persisting saga state after each step
20-
2. Periodically scanning for incomplete sagas
20+
2. Periodically scanning for incomplete sagas (via `get_sagas_for_recovery`)
2121
3. Resuming execution from the last completed step
2222
4. Completing compensation if saga was in compensating state
23+
5. Tracking **recovery attempts** — on recovery failure, the storage increments `recovery_attempts` automatically so sagas can be retried or excluded when the limit is reached
2324

2425
## Eventual Consistency
2526

2627
The saga pattern ensures eventual consistency through:
2728

2829
- **Persistent State** — Saved after each step
2930
- **Recovery Mechanism** — Interrupted sagas can be resumed
31+
- **Recovery Attempts** — Each saga has a `recovery_attempts` counter; it is incremented automatically when recovery fails, so you can limit retries and exclude sagas that exceed `max_recovery_attempts`
3032
- **Compensation Guarantee** — Failed sagas are always compensated
3133
- **Terminal States** — All sagas eventually reach COMPLETED or FAILED
3234

@@ -93,6 +95,33 @@ except RuntimeError:
9395
**Status:** `COMPLETED`
9496
**Recovery:** No action needed
9597

98+
## Recovery Attempts
99+
100+
Each saga in storage has a **recovery_attempts** counter. It is used to:
101+
102+
- **Limit retries** — Sagas that fail recovery repeatedly can be excluded from future recovery runs
103+
- **Avoid infinite loops** — Persistently failing sagas (e.g. due to bad data) stop being picked after `max_recovery_attempts`
104+
105+
**Automatic increment:** When `recover_saga()` fails (exception during resume), the storage's `increment_recovery_attempts(saga_id, new_status=SagaStatus.FAILED)` is called automatically. Callers do **not** need to call `increment_recovery_attempts` themselves.
106+
107+
**Getting sagas for recovery:** Use `storage.get_sagas_for_recovery()` instead of a custom query:
108+
109+
```python
110+
ids = await storage.get_sagas_for_recovery(
111+
limit=50,
112+
max_recovery_attempts=5, # Only sagas with recovery_attempts < 5
113+
stale_after_seconds=120, # Only sagas not updated in last 2 minutes (avoids picking active sagas)
114+
)
115+
```
116+
117+
| Parameter | Description |
118+
|-----------|-------------|
119+
| `limit` | Maximum number of saga IDs to return |
120+
| `max_recovery_attempts` | Only include sagas with `recovery_attempts` strictly less than this value (default: 5) |
121+
| `stale_after_seconds` | If set, only include sagas whose `updated_at` is older than (now − this value). Use to avoid picking sagas currently being executed. `None` = no filter |
122+
123+
Returns saga IDs in status RUNNING, COMPENSATING, or FAILED, ordered by `updated_at` ascending (oldest first).
124+
96125
## Strict Backward Recovery
97126

98127
Once a saga enters `COMPENSATING` or `FAILED` status, forward execution is **permanently disabled**. Only compensation can proceed.
@@ -103,20 +132,33 @@ This prevents "zombie states" where compensation actions conflict with new execu
103132

104133
### Background Recovery Job
105134

135+
Use `storage.get_sagas_for_recovery()` to get saga IDs that need recovery. On recovery failure, `recover_saga()` calls `increment_recovery_attempts` internally — no extra code needed.
136+
106137
```python
107138
import asyncio
108139
from cqrs.saga.recovery import recover_saga
109140

110-
async def recovery_job():
141+
async def recovery_job(storage, saga, context_builder, container):
111142
while True:
112-
incomplete_sagas = await find_incomplete_sagas()
113-
for saga_id in incomplete_sagas:
143+
ids = await storage.get_sagas_for_recovery(
144+
limit=50,
145+
max_recovery_attempts=5,
146+
stale_after_seconds=120, # Avoid sagas currently being executed
147+
)
148+
for saga_id in ids:
114149
try:
115-
await recover_saga(saga, saga_id, OrderContext)
150+
await recover_saga(
151+
saga=saga,
152+
saga_id=saga_id,
153+
context_builder=context_builder,
154+
container=container,
155+
storage=storage,
156+
)
116157
except RuntimeError:
117-
pass # Compensation completed
158+
pass # Expected when compensation completed (forward execution not allowed)
118159
except Exception as e:
119-
logger.error(f"Recovery failed: {e}")
160+
logger.error(f"Recovery failed for {saga_id}: {e}")
161+
# recovery_attempts already incremented by recover_saga
120162
await asyncio.sleep(60) # Scan every minute
121163
```
122164

@@ -127,13 +169,19 @@ async def recovery_job():
127169
from apscheduler.schedulers.asyncio import AsyncIOScheduler
128170

129171
scheduler = AsyncIOScheduler()
130-
scheduler.add_job(recovery_job, 'interval', minutes=5)
172+
scheduler.add_job(
173+
lambda: recovery_job(storage, OrderSaga(), OrderContext, container),
174+
'interval',
175+
minutes=5,
176+
)
131177
scheduler.start()
132178
```
133179

134180
## Best Practices
135181

136-
1. **Run recovery periodically** — Background job to scan for incomplete sagas
137-
2. **Handle failures** — Log errors and send alerts
138-
3. **Monitor metrics** — Track recovery rate, duration, and failures
139-
4. **Use persistent storage** — Memory storage loses data on restart
182+
1. **Run recovery periodically** — Background job using `get_sagas_for_recovery()` to scan for incomplete sagas
183+
2. **Use `max_recovery_attempts`** — Exclude sagas that fail recovery too many times (e.g. 5) to avoid infinite retries
184+
3. **Use `stale_after_seconds`** — Avoid picking sagas that are currently being executed by another worker
185+
4. **Handle failures** — Log errors and send alerts; `increment_recovery_attempts` is called automatically by `recover_saga`
186+
5. **Monitor metrics** — Track recovery rate, duration, failures, and sagas exceeding max attempts
187+
6. **Use persistent storage** — Memory storage loses data on restart

docs/saga/storage.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,13 @@ class ISagaStorage(abc.ABC):
3131
async def log_step(saga_id, step_name, action, status, details=None) -> None
3232
async def load_saga_state(saga_id, *, read_for_update: bool = False) -> tuple[SagaStatus, dict, int]
3333
async def get_step_history(saga_id) -> list[SagaLogEntry]
34+
async def get_sagas_for_recovery(limit, max_recovery_attempts=5, stale_after_seconds=None) -> list[uuid.UUID]
35+
async def increment_recovery_attempts(saga_id, new_status: SagaStatus | None = None) -> None
3436
```
3537

38+
- **get_sagas_for_recovery** — Returns saga IDs that need recovery (RUNNING, COMPENSATING, FAILED) with `recovery_attempts` &lt; `max_recovery_attempts`, optionally filtered by staleness. Used by recovery jobs.
39+
- **increment_recovery_attempts** — Called automatically by `recover_saga()` on recovery failure; increments `recovery_attempts` and optionally updates status (e.g. to FAILED).
40+
3641
## Memory Storage
3742

3843
In-memory implementation for testing and development.
@@ -78,6 +83,7 @@ Database-backed implementation for production. It uses a session factory to mana
7883
- `status` (VARCHAR) - PENDING, RUNNING, COMPENSATING, COMPLETED, FAILED
7984
- `context` (JSON)
8085
- `version` (INTEGER) - Optimistic locking version (default: 1)
86+
- `recovery_attempts` (INTEGER) - Number of failed recovery attempts (default: 0); used by `get_sagas_for_recovery` and `increment_recovery_attempts`
8187
- `created_at`, `updated_at` (TIMESTAMP)
8288

8389
**saga_logs:**

0 commit comments

Comments
 (0)