Extend Saga Storage interface with new improvements
π― Overview
This release extends the Saga Storage interface with two improvements for recovery workflows: explicit control over the recovery attempt counter and optional filtering of recovery candidates by saga name. All changes are backward compatible.
β¨ What's New
π’ set_recovery_attempts β Explicit Recovery Counter
A new method on ISagaStorage and its implementations lets you set the recovery attempt counter to any value instead of only incrementing it.
Use cases:
| Scenario | Example |
|---|---|
| Reset after successful step recovery | After you successfully resume one step, reset the counter so the saga stays eligible for recovery: await storage.set_recovery_attempts(saga_id, 0) |
| Exclude from recovery without changing status | Mark a saga as "no more retries" by setting the counter to the max: await storage.set_recovery_attempts(saga_id, max_recovery_attempts) β it will no longer appear in get_sagas_for_recovery() |
Signature:
async def set_recovery_attempts(self, saga_id: uuid.UUID, attempts: int) -> NoneImplemented in: MemorySagaStorage, SqlAlchemySagaStorage.
π·οΈ get_sagas_for_recovery β Optional Filter by Saga Name
get_sagas_for_recovery() now accepts an optional saga_name parameter. You can run separate recovery jobs per saga type and only fetch the sagas that job is responsible for.
saga_name |
Behavior |
|---|---|
None (default) |
Returns all saga types β same as before, fully backward compatible |
"OrderSaga" |
Returns only sagas with name == "OrderSaga" |
Example β one job per saga type:
# Job 1: only OrderSaga
ids = await storage.get_sagas_for_recovery(
limit=50,
max_recovery_attempts=5,
saga_name="OrderSaga",
)
# Job 2: only PaymentSaga
ids = await storage.get_sagas_for_recovery(
limit=50,
max_recovery_attempts=5,
saga_name="PaymentSaga",
)Updated signature:
async def get_sagas_for_recovery(
self,
limit: int,
max_recovery_attempts: int = 5,
stale_after_seconds: int | None = None,
saga_name: str | None = None, # NEW
) -> list[uuid.UUID]π Summary of Changes
| Area | Change |
|---|---|
Protocol (ISagaStorage) |
New method set_recovery_attempts(saga_id, attempts); get_sagas_for_recovery gains optional saga_name=None |
| SqlAlchemySagaStorage | Implements set_recovery_attempts; adds WHERE name = :saga_name when saga_name is set |
| MemorySagaStorage | Implements set_recovery_attempts; filters in-memory by data["name"] == saga_name when saga_name is set |
| Tests | New integration tests for set_recovery_attempts (set value, exclude from recovery, not found) and for saga_name (filter by name, None returns all types) in both storage backends |
π Migration & Compatibility
- Existing code that calls
get_sagas_for_recovery(limit=..., max_recovery_attempts=..., stale_after_seconds=...)continues to work unchanged;saga_namedefaults toNone. - Custom storage implementations of
ISagaStoragemust implement the new abstract methodset_recovery_attempts(saga_id, attempts)and add the optionalsaga_nameparameter toget_sagas_for_recoveryto satisfy the interface.
π¦ Full Changelog
Added
- Saga Storage: method
set_recovery_attempts(saga_id, attempts)to set recovery attempt counter explicitly - Saga Storage: optional parameter
saga_nameinget_sagas_for_recovery()for filtering recovery candidates by saga name - Integration tests for
set_recovery_attemptsandsaga_namefiltering (Memory and SqlAlchemy)
Changed
get_sagas_for_recovery()signature extended withsaga_name: str | None = None(backward compatible)