Skip to content

Commit 0dfed4f

Browse files
[Refactor] SQLAlchemy outbox model. (#66)
* Make sqlalchemy an optional dependency. Implement BinaryUUID for PostgreSQL and MySQL compatibility. * compatibility * lint fix * lint fix * save sqlalchemy as required dependency * lint fix * review fixes --------- Co-authored-by: nikitakunov8@gmail.com <nikitakunov8@gmail.com>
1 parent 95aed36 commit 0dfed4f

3 files changed

Lines changed: 80 additions & 29 deletions

File tree

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,17 @@ dependencies = [
2121
"dependency-injector>=4.0",
2222
"orjson==3.*",
2323
"pydantic==2.*",
24+
"sqlalchemy[asyncio]==2.0.*",
2425
"python-dotenv==1.*",
2526
"retry-async==0.1.*",
26-
"sqlalchemy[asyncio]==2.0.*",
2727
"typing-extensions>=4.0"
2828
]
2929
description = "Event-Driven Architecture Framework for Distributed Systems"
3030
maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
3131
name = "python-cqrs"
3232
readme = "README.md"
3333
requires-python = ">=3.10"
34-
version = "4.10.0"
34+
version = "4.10.1"
3535

3636
[project.optional-dependencies]
3737
aiobreaker = ["aiobreaker>=0.3.0"]
@@ -68,6 +68,7 @@ examples = [
6868
]
6969
kafka = ["aiokafka==0.10.0"]
7070
rabbit = ["aio-pika==9.3.0"]
71+
sqlalchemy = ["sqlalchemy[asyncio]==2.0.*"]
7172

7273
[project.urls]
7374
Documentation = "https://mkdocs.python-cqrs.dev/"

src/cqrs/outbox/sqlalchemy.py

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,29 @@
1+
import datetime
12
import logging
23
import typing
34

45
import dotenv
56
import orjson
6-
import sqlalchemy
7-
from sqlalchemy import func
8-
from sqlalchemy.dialects import mysql
9-
from sqlalchemy.ext.asyncio import session as sql_session
10-
from sqlalchemy.orm import DeclarativeMeta, registry
11-
127
import cqrs
8+
import uuid
139
from cqrs import compressors
1410
from cqrs.outbox import map, repository
1511

12+
try:
13+
import sqlalchemy
14+
15+
from sqlalchemy import func
16+
from sqlalchemy.orm import Mapped, mapped_column, DeclarativeMeta, registry
17+
from sqlalchemy.ext.asyncio import session as sql_session
18+
from sqlalchemy.dialects import postgresql
19+
except ImportError:
20+
raise ImportError(
21+
"You are trying to use SQLAlchemy outbox implementation, "
22+
"but 'sqlalchemy' is not installed. "
23+
"Please install it using: pip install python-cqrs[sqlalchemy]",
24+
) from None
25+
26+
1627
Base = registry().generate_base()
1728

1829
logger = logging.getLogger(__name__)
@@ -24,6 +35,39 @@
2435
MAX_FLUSH_COUNTER_VALUE = 5
2536

2637

38+
class BinaryUUID(sqlalchemy.TypeDecorator):
39+
"""Stores the UUID as a native UUID in Postgres and as BINARY(16) in other databases (MySQL)."""
40+
41+
impl = sqlalchemy.BINARY(16)
42+
cache_ok = True
43+
44+
def load_dialect_impl(self, dialect):
45+
if dialect.name == "postgresql":
46+
return dialect.type_descriptor(postgresql.UUID())
47+
else:
48+
return dialect.type_descriptor(sqlalchemy.BINARY(16))
49+
50+
def process_bind_param(self, value, dialect):
51+
if value is None:
52+
return value
53+
if isinstance(value, str):
54+
value = uuid.UUID(value)
55+
if dialect.name == "postgresql":
56+
return value # asyncpg works with uuid.UUID
57+
if isinstance(value, uuid.UUID):
58+
return value.bytes # For MySQL return 16 bytes
59+
return value
60+
61+
def process_result_value(self, value, dialect):
62+
if value is None:
63+
return value
64+
if dialect.name == "postgresql":
65+
return value # asyncpg return uuid.UUID
66+
if isinstance(value, bytes):
67+
return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID
68+
return value
69+
70+
2771
class OutboxModel(Base):
2872
__tablename__ = DEFAULT_OUTBOX_TABLE_NAME
2973

@@ -34,57 +78,56 @@ class OutboxModel(Base):
3478
name="event_id_unique_index",
3579
),
3680
)
37-
id = sqlalchemy.Column(
38-
sqlalchemy.BigInteger(),
81+
id: Mapped[int] = mapped_column(
82+
sqlalchemy.BigInteger,
3983
sqlalchemy.Identity(),
4084
primary_key=True,
4185
nullable=False,
4286
autoincrement=True,
4387
comment="Identity",
4488
)
45-
event_id = sqlalchemy.Column(
46-
sqlalchemy.Uuid,
89+
event_id: Mapped[uuid.UUID] = mapped_column(
90+
BinaryUUID,
4791
nullable=False,
4892
comment="Event idempotency id",
4993
)
50-
event_id_bin = sqlalchemy.Column(
94+
event_id_bin: Mapped[bytes] = mapped_column(
5195
sqlalchemy.BINARY(16),
5296
nullable=False,
5397
comment="Event idempotency id in 16 bit presentation",
5498
)
55-
event_status = sqlalchemy.Column(
99+
event_status: Mapped[repository.EventStatus] = mapped_column(
56100
sqlalchemy.Enum(repository.EventStatus),
57101
nullable=False,
58102
default=repository.EventStatus.NEW,
59103
comment="Event producing status",
60104
)
61-
flush_counter = sqlalchemy.Column(
62-
sqlalchemy.SmallInteger(),
105+
flush_counter: Mapped[int] = mapped_column(
106+
sqlalchemy.SmallInteger,
63107
nullable=False,
64108
default=0,
65109
comment="Event producing flush counter",
66110
)
67-
event_name = sqlalchemy.Column(
111+
event_name: Mapped[typing.Text] = mapped_column(
68112
sqlalchemy.String(255),
69113
nullable=False,
70114
comment="Event name",
71115
)
72-
topic = sqlalchemy.Column(
116+
topic: Mapped[typing.Text] = mapped_column(
73117
sqlalchemy.String(255),
74118
nullable=False,
75119
comment="Event topic",
76120
default="",
77121
)
78-
created_at = sqlalchemy.Column(
122+
created_at: Mapped[datetime.datetime] = mapped_column(
79123
sqlalchemy.DateTime,
80124
nullable=False,
81125
server_default=func.now(),
82126
comment="Event creation timestamp",
83127
)
84-
payload = sqlalchemy.Column(
85-
mysql.BLOB,
128+
payload: Mapped[bytes] = mapped_column(
129+
sqlalchemy.LargeBinary,
86130
nullable=False,
87-
default={},
88131
comment="Event payload",
89132
)
90133

@@ -174,7 +217,7 @@ def add(
174217
self.session.add(
175218
OutboxModel(
176219
event_id=event.event_id,
177-
event_id_bin=func.UUID_TO_BIN(event.event_id),
220+
event_id_bin=event.event_id.bytes,
178221
event_name=event.event_name,
179222
created_at=event.event_timestamp,
180223
payload=bytes_payload,

src/cqrs/saga/storage/sqlalchemy.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,24 @@
66
import uuid
77

88
import dotenv
9-
import sqlalchemy
10-
from sqlalchemy import func
11-
from sqlalchemy.exc import SQLAlchemyError
12-
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
13-
from sqlalchemy.orm import registry
14-
159
from cqrs.dispatcher.exceptions import SagaConcurrencyError
1610
from cqrs.saga.storage.enums import SagaStatus, SagaStepStatus
1711
from cqrs.saga.storage.models import SagaLogEntry
1812
from cqrs.saga.storage.protocol import ISagaStorage, SagaStorageRun
1913

14+
try:
15+
import sqlalchemy
16+
from sqlalchemy import func
17+
from sqlalchemy.exc import SQLAlchemyError
18+
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
19+
from sqlalchemy.orm import registry
20+
except ImportError:
21+
raise ImportError(
22+
"You are trying to use SQLAlchemy saga storage implementation, "
23+
"but 'sqlalchemy' is not installed. "
24+
"Please install it using: pip install python-cqrs[sqlalchemy]",
25+
) from None
26+
2027
Base = registry().generate_base()
2128
logger = logging.getLogger(__name__)
2229

0 commit comments

Comments
 (0)