Skip to content

Commit 37e723a

Browse files
vadikko2Π’Π°Π΄ΠΈΠΌ ΠšΠΎΠ·Ρ‹Ρ€Π΅Π²ΡΠΊΠΈΠΉcoderabbitai[bot]
authored
Reduce saga storage overhead (#61)
* Try ti reduce saga storage overhead * Increase version * Rename banchmarks * πŸ“ Add docstrings to `reduce-saga-storage-overhead` (#62) * Fixes after review * Fix deadlocks * Fix after pre-commit * fix ruff format * fix after review * fix after review * fix after review * fix after review * fix after review * fix after review * fix after review --------- Co-authored-by: Π’Π°Π΄ΠΈΠΌ ΠšΠΎΠ·Ρ‹Ρ€Π΅Π²ΡΠΊΠΈΠΉ <v.kozyrevskiy@timeweb.ru> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent ce4dbdf commit 37e723a

50 files changed

Lines changed: 2835 additions & 930 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

β€Ž.github/workflows/codspeed.ymlβ€Ž

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,19 @@ jobs:
4646
echo "MySQL did not become ready in time"
4747
exit 1
4848
49+
- name: Wait for PostgreSQL
50+
run: |
51+
for i in $(seq 1 30); do
52+
if docker compose -f docker-compose-test.yml exec -T postgres_tests pg_isready -h localhost -U cqrs -q 2>/dev/null; then
53+
echo "PostgreSQL is ready"
54+
exit 0
55+
fi
56+
echo "Waiting for PostgreSQL... ($i/30)"
57+
sleep 2
58+
done
59+
echo "PostgreSQL did not become ready in time"
60+
exit 1
61+
4962
- name: Wait for Redis
5063
run: |
5164
for i in $(seq 1 15); do

β€Ž.github/workflows/tests.ymlβ€Ž

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ jobs:
6969
vermin --target=3.10- --violations --eval-annotations --backport typing_extensions --exclude=venv --exclude=build --exclude=.git --exclude=.venv src examples tests
7070
7171
test:
72+
name: test (py ${{ matrix.python-version }})
7273
runs-on: ubuntu-latest
7374
strategy:
7475
fail-fast: false
@@ -105,6 +106,19 @@ jobs:
105106
echo "MySQL did not become ready in time"
106107
exit 1
107108
109+
- name: Wait for PostgreSQL
110+
run: |
111+
for i in $(seq 1 30); do
112+
if docker compose -f docker-compose-test.yml exec -T postgres_tests pg_isready -h localhost -U cqrs -q 2>/dev/null; then
113+
echo "PostgreSQL is ready"
114+
exit 0
115+
fi
116+
echo "Waiting for PostgreSQL... ($i/30)"
117+
sleep 2
118+
done
119+
echo "PostgreSQL did not become ready in time"
120+
exit 1
121+
108122
- name: Wait for Redis
109123
run: |
110124
for i in $(seq 1 15); do
@@ -119,6 +133,10 @@ jobs:
119133
exit 1
120134
121135
- name: Run all tests with coverage
136+
env:
137+
DATABASE_DSN: mysql+asyncmy://cqrs:cqrs@localhost:3307/test_cqrs
138+
DATABASE_DSN_MYSQL: mysql+asyncmy://cqrs:cqrs@localhost:3307/test_cqrs
139+
DATABASE_DSN_POSTGRESQL: postgresql+asyncpg://cqrs:cqrs@localhost:5433/cqrs
122140
run: |
123141
pytest -c ./tests/pytest-config.ini --cov=src --cov-report=xml --cov-report=term -o cache_dir=/tmp/pytest_cache ./tests/unit ./tests/integration
124142

β€Ž.pre-commit-config.yamlβ€Ž

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ repos:
99
- id: check-added-large-files
1010
- args:
1111
- --pytest-test-first
12-
exclude: (^tests/mock/|^tests/integration/|^tests/fixtures)
12+
exclude: (^tests/mock/|^tests/integration/|^tests/fixtures|conftest\.py$)
1313
id: name-tests-test
1414
- id: check-merge-conflict
1515
- id: check-json

β€ŽREADME.mdβ€Ž

Lines changed: 85 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -43,29 +43,29 @@
4343
4444
## Overview
4545

46-
This is a package for implementing the CQRS (Command Query Responsibility Segregation) pattern in Python applications.
47-
It provides a set of abstractions and utilities to help separate read and write use cases, ensuring better scalability,
48-
performance, and maintainability of the application.
46+
An event-driven framework for building distributed systems in Python. It centers on CQRS (Command Query Responsibility Segregation) and extends into messaging, sagas, and reliable event delivery β€” so you can separate read and write flows, react to events from the bus, run distributed transactions with compensation, and publish events via Transaction Outbox. The result is clearer structure, better scalability, and easier evolution of the application.
4947

5048
This package is a fork of the [diator](https://github.com/akhundMurad/diator)
51-
project ([documentation](https://akhundmurad.github.io/diator/)) with several enhancements:
52-
53-
1. Support for Pydantic [v2.*](https://docs.pydantic.dev/2.8/);
54-
2. `Kafka` support using [aiokafka](https://github.com/aio-libs/aiokafka);
55-
3. Added `EventMediator` for handling `Notification` and `ECST` events coming from the bus;
56-
4. Redesigned the event and request mapping mechanism to handlers;
57-
5. Added `bootstrap` for easy setup;
58-
6. Added support for [Transaction Outbox](https://microservices.io/patterns/data/transactional-outbox.html), ensuring
59-
that `Notification` and `ECST` events are sent to the broker;
60-
7. FastAPI supporting;
61-
8. FastStream supporting;
62-
9. [Protobuf](https://protobuf.dev/) events supporting;
63-
10. `StreamingRequestMediator` and `StreamingRequestHandler` for handling streaming requests with real-time progress updates;
64-
11. Parallel event processing with configurable concurrency limits;
65-
12. Chain of Responsibility pattern support with `CORRequestHandler` for processing requests through multiple handlers in sequence;
66-
13. Orchestrated Saga pattern support for managing distributed transactions with automatic compensation and recovery mechanisms;
67-
14. Built-in Mermaid diagram generation, enabling automatic generation of Sequence and Class diagrams for documentation and visualization;
68-
15. Flexible Request and Response types support - use Pydantic-based or Dataclass-based implementations, with the ability to mix and match types based on your needs.
49+
project ([documentation](https://akhundmurad.github.io/diator/)) with several enhancements, ordered by importance:
50+
51+
**Core framework**
52+
53+
1. Redesigned the event and request mapping mechanism to handlers;
54+
2. `EventMediator` for handling `Notification` and `ECST` events coming from the bus;
55+
3. `bootstrap` for easy setup;
56+
4. **Transaction Outbox**, ensuring that `Notification` and `ECST` events are sent to the broker;
57+
5. **Orchestrated Saga** pattern for distributed transactions with automatic compensation and recovery;
58+
6. `StreamingRequestMediator` and `StreamingRequestHandler` for streaming requests with real-time progress updates;
59+
7. **Chain of Responsibility** with `CORRequestHandler` for processing requests through multiple handlers in sequence;
60+
8. **Parallel event processing** with configurable concurrency limits.
61+
62+
**Also**
63+
64+
- **Typing:** Pydantic [v2.*](https://docs.pydantic.dev/2.8/) and `IRequest`/`IResponse` interfaces β€” use Pydantic-based, dataclass-based, or custom Request/Response implementations.
65+
- **Broker:** Kafka via [aiokafka](https://github.com/aio-libs/aiokafka).
66+
- **Integration:** Ready for integration with FastAPI and FastStream.
67+
- **Documentation:** Built-in Mermaid diagram generation (Sequence and Class diagrams).
68+
- **Protobuf:** Interface-level support for converting Notification events to Protobuf and back.
6969

7070
## Request Handlers
7171

@@ -87,7 +87,7 @@ class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):
8787

8888
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
8989
self._meetings_api = meetings_api
90-
self.events: list[Event] = []
90+
self._events: list[Event] = []
9191

9292
@property
9393
def events(self) -> typing.List[events.Event]:
@@ -115,7 +115,7 @@ class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryR
115115

116116
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
117117
self._meetings_api = meetings_api
118-
self.events: list[Event] = []
118+
self._events: list[Event] = []
119119

120120
@property
121121
def events(self) -> typing.List[events.Event]:
@@ -304,6 +304,63 @@ class CustomResponse(cqrs.IResponse):
304304

305305
A complete example can be found in [request_response_types.py](https://github.com/vadikko2/cqrs/blob/master/examples/request_response_types.py)
306306

307+
## Mapping
308+
309+
To bind commands, queries and events with specific handlers, you can use the registries `EventMap` and `RequestMap`.
310+
311+
```python
312+
from cqrs import requests, events
313+
314+
from app import commands, command_handlers
315+
from app import queries, query_handlers
316+
from app import events as event_models, event_handlers
317+
318+
319+
def init_commands(mapper: requests.RequestMap) -> None:
320+
mapper.bind(commands.JoinMeetingCommand, command_handlers.JoinMeetingCommandHandler)
321+
322+
def init_queries(mapper: requests.RequestMap) -> None:
323+
mapper.bind(queries.ReadMeetingQuery, query_handlers.ReadMeetingQueryHandler)
324+
325+
def init_events(mapper: events.EventMap) -> None:
326+
mapper.bind(events.NotificationEvent[event_models.NotificationMeetingRoomClosed], event_handlers.MeetingRoomClosedNotificationHandler)
327+
mapper.bind(events.NotificationEvent[event_models.ECSTMeetingRoomClosed], event_handlers.UpdateMeetingRoomReadModelHandler)
328+
```
329+
330+
## Bootstrap
331+
332+
The `python-cqrs` package implements a set of bootstrap utilities designed to simplify the initial configuration of an
333+
application.
334+
335+
```python
336+
import functools
337+
338+
from cqrs.events import bootstrap as event_bootstrap
339+
from cqrs.requests import bootstrap as request_bootstrap
340+
341+
from app import dependencies, mapping, orm
342+
343+
344+
@functools.lru_cache
345+
def mediator_factory():
346+
return request_bootstrap.bootstrap(
347+
di_container=dependencies.setup_di(),
348+
commands_mapper=mapping.init_commands,
349+
queries_mapper=mapping.init_queries,
350+
domain_events_mapper=mapping.init_events,
351+
on_startup=[orm.init_store_event_mapper],
352+
)
353+
354+
355+
@functools.lru_cache
356+
def event_mediator_factory():
357+
return event_bootstrap.bootstrap(
358+
di_container=dependencies.setup_di(),
359+
events_mapper=mapping.init_events,
360+
on_startup=[orm.init_store_event_mapper],
361+
)
362+
```
363+
307364
## Saga Pattern
308365

309366
The package implements the Orchestrated Saga pattern for managing distributed transactions across multiple services or operations.
@@ -689,17 +746,7 @@ loop.run_until_complete(periodically_task())
689746
A complete example can be found in
690747
the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/kafka_outboxed_event_producing.py)
691748

692-
## Transaction log tailing
693-
694-
If the Outbox polling strategy does not suit your needs, I recommend exploring
695-
the [Transaction Log Tailing](https://microservices.io/patterns/data/transaction-log-tailing.html) pattern.
696-
The current version of the python-cqrs package does not support the implementation of this pattern.
697-
698-
> [!TIP]
699-
> However, it can be implemented
700-
> using [Debezium + Kafka Connect](https://debezium.io/documentation/reference/stable/architecture.html),
701-
> which allows you to produce all newly created events within the Outbox storage directly to the corresponding topic in
702-
> Kafka (or any other broker).
749+
**Transaction log tailing.** If Outbox polling does not suit you, consider [Transaction Log Tailing](https://microservices.io/patterns/data/transaction-log-tailing.html). The package does not implement it; you can use [Debezium + Kafka Connect](https://debezium.io/documentation/reference/stable/architecture.html) to tail the Outbox and produce events to Kafka.
703750

704751
## DI container
705752

@@ -765,65 +812,10 @@ Complete examples can be found in:
765812
- [Simple example](https://github.com/vadikko2/cqrs/blob/master/examples/dependency_injector_integration_simple_example.py)
766813
- [Practical example with FastAPI](https://github.com/vadikko2/cqrs/blob/master/examples/dependency_injector_integration_practical_example.py)
767814

768-
## Mapping
769-
770-
To bind commands, queries and events with specific handlers, you can use the registries `EventMap` and `RequestMap`.
771-
772-
```python
773-
from cqrs import requests, events
774-
775-
from app import commands, command_handlers
776-
from app import queries, query_handlers
777-
from app import events as event_models, event_handlers
778-
779-
780-
def init_commands(mapper: requests.RequestMap) -> None:
781-
mapper.bind(commands.JoinMeetingCommand, command_handlers.JoinMeetingCommandHandler)
782-
783-
def init_queries(mapper: requests.RequestMap) -> None:
784-
mapper.bind(queries.ReadMeetingQuery, query_handlers.ReadMeetingQueryHandler)
785-
786-
def init_events(mapper: events.EventMap) -> None:
787-
mapper.bind(events.NotificationEvent[events_models.NotificationMeetingRoomClosed], event_handlers.MeetingRoomClosedNotificationHandler)
788-
mapper.bind(events.NotificationEvent[event_models.ECSTMeetingRoomClosed], event_handlers.UpdateMeetingRoomReadModelHandler)
789-
```
790-
791-
## Bootstrap
792-
793-
The `python-cqrs` package implements a set of bootstrap utilities designed to simplify the initial configuration of an
794-
application.
795-
796-
```python
797-
import functools
798-
799-
from cqrs.events import bootstrap as event_bootstrap
800-
from cqrs.requests import bootstrap as request_bootstrap
801-
802-
from app import dependencies, mapping, orm
803-
804-
805-
@functools.lru_cache
806-
def mediator_factory():
807-
return request_bootstrap.bootstrap(
808-
di_container=dependencies.setup_di(),
809-
commands_mapper=mapping.init_commands,
810-
queries_mapper=mapping.init_queries,
811-
domain_events_mapper=mapping.init_events,
812-
on_startup=[orm.init_store_event_mapper],
813-
)
814-
815-
816-
@functools.lru_cache
817-
def event_mediator_factory():
818-
return event_bootstrap.bootstrap(
819-
di_container=dependencies.setup_di(),
820-
events_mapper=mapping.init_events,
821-
on_startup=[orm.init_store_event_mapper],
822-
)
823-
```
824-
825815
## Integration with presentation layers
826816

817+
The framework is ready for integration with **FastAPI** and **FastStream**.
818+
827819
> [!TIP]
828820
> I recommend reading the useful
829821
> paper [Onion Architecture Used in Software Development](https://www.researchgate.net/publication/371006360_Onion_Architecture_Used_in_Software_Development).
@@ -956,8 +948,5 @@ the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/fastap
956948

957949
## Protobuf messaging
958950

959-
The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).\\
960-
Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data –
961-
think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use
962-
special generated source code to easily write and read your structured data to and from a variety of data streams and
963-
using a variety of languages.
951+
The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).
952+
There is interface-level support for converting Notification events to Protobuf and back. Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.

β€Ždocker-compose-dev.ymlβ€Ž

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@ services:
1515
command: --init-file /data/application/init.sql
1616
volumes:
1717
- ./tests/init_database.sql:/data/application/init.sql
18+
postgres_dev:
19+
image: postgres:15.4
20+
hostname: postgres-dev
21+
restart: always
22+
environment:
23+
POSTGRES_USER: cqrs
24+
POSTGRES_PASSWORD: cqrs
25+
POSTGRES_DB: cqrs
26+
ports:
27+
- "5433:5432"
1828
kafka0:
1929
image: confluentinc/cp-kafka:7.2.1
2030
hostname: kafka0

β€Ždocker-compose-test.ymlβ€Ž

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@ services:
1515
command: --init-file /data/application/init.sql
1616
volumes:
1717
- ./tests/init_database.sql:/data/application/init.sql
18+
postgres_tests:
19+
image: postgres:15.4
20+
hostname: postgres-test
21+
restart: always
22+
environment:
23+
POSTGRES_USER: cqrs
24+
POSTGRES_PASSWORD: cqrs
25+
POSTGRES_DB: cqrs
26+
ports:
27+
- "5433:5432"
1828
redis_tests:
1929
image: redis:7.2
2030
hostname: redis

β€Žexamples/dependency_injector_integration_practical_example.pyβ€Ž

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -351,9 +351,7 @@ def setup_logging() -> None:
351351
)
352352

353353
# Add a StreamHandler if none exists
354-
has_stream_handler = any(
355-
isinstance(h, logging.StreamHandler) for h in root_logger.handlers
356-
)
354+
has_stream_handler = any(isinstance(h, logging.StreamHandler) for h in root_logger.handlers)
357355
if not has_stream_handler:
358356
stream_handler = logging.StreamHandler()
359357
stream_handler.setLevel(logging.DEBUG)

β€Žexamples/kafka_event_consuming.pyβ€Ž

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,7 @@ def mediator_factory() -> cqrs.EventMediator:
158158
decoder=empty_message_decoder,
159159
)
160160
async def hello_world_event_handler(
161-
body: cqrs.NotificationEvent[HelloWorldPayload]
162-
| deserializers.DeserializeJsonError
163-
| None,
161+
body: cqrs.NotificationEvent[HelloWorldPayload] | deserializers.DeserializeJsonError | None,
164162
msg: kafka.KafkaMessage,
165163
mediator: cqrs.EventMediator = faststream.Depends(mediator_factory),
166164
):

β€Žexamples/request_response_types.pyβ€Ž

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,7 @@ async def handle(self, request: GetUserQuery) -> UserDetailsResponse:
256256
raise ValueError(f"User {request.user_id} not found")
257257

258258
user = USER_STORAGE[request.user_id]
259-
total_orders = sum(
260-
1 for order in ORDER_STORAGE.values() if order["user_id"] == request.user_id
261-
)
259+
total_orders = sum(1 for order in ORDER_STORAGE.values() if order["user_id"] == request.user_id)
262260

263261
return UserDetailsResponse(
264262
user_id=user["user_id"],

0 commit comments

Comments
Β (0)