-
Notifications
You must be signed in to change notification settings - Fork 5
[Refactor] SQLAlchemy outbox model. #66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c9349cc
5977dd2
bb538d9
39e4c8f
99485f8
5e6e5eb
eed1e31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,18 +1,29 @@ | ||
| import datetime | ||
| import logging | ||
| import typing | ||
|
|
||
| import dotenv | ||
| import orjson | ||
| import sqlalchemy | ||
| from sqlalchemy import func | ||
| from sqlalchemy.dialects import mysql | ||
| from sqlalchemy.ext.asyncio import session as sql_session | ||
| from sqlalchemy.orm import DeclarativeMeta, registry | ||
|
|
||
| import cqrs | ||
| import uuid | ||
| from cqrs import compressors | ||
| from cqrs.outbox import map, repository | ||
|
|
||
| try: | ||
| import sqlalchemy | ||
|
|
||
| from sqlalchemy import func | ||
| from sqlalchemy.orm import Mapped, mapped_column, DeclarativeMeta, registry | ||
| from sqlalchemy.ext.asyncio import session as sql_session | ||
| from sqlalchemy.dialects import postgresql | ||
| except ImportError: | ||
| raise ImportError( | ||
| "You are trying to use SQLAlchemy outbox implementation, " | ||
| "but 'sqlalchemy' is not installed. " | ||
| "Please install it using: pip install python-cqrs[sqlalchemy]", | ||
| ) from None | ||
|
|
||
|
Comment on lines
+12
to
+25
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: cat -n src/cqrs/outbox/sqlalchemy.py | head -30Repository: pypatterns/python-cqrs Length of output: 1011 🏁 Script executed: python3 << 'EOF'
# Verify ModuleNotFoundError behavior and attributes
import sys
# Test 1: Check ModuleNotFoundError is subclass of ImportError
print(f"ModuleNotFoundError is subclass of ImportError: {issubclass(ModuleNotFoundError, ImportError)}")
# Test 2: Check what attributes ModuleNotFoundError has
try:
import nonexistent_module_xyz_123
except ModuleNotFoundError as e:
print(f"\nModuleNotFoundError attributes:")
print(f" - name: {getattr(e, 'name', 'NOT FOUND')}")
print(f" - path: {getattr(e, 'path', 'NOT FOUND')}")
print(f" - type(e): {type(e).__name__}")
# Test 3: Check what happens with nested ImportError from within a module
print(f"\n--- Test 3: Nested import failure ---")
# Create a test scenario where a submodule fails to import
try:
# This will raise ModuleNotFoundError because 'sqlalchemy' doesn't exist
import sqlalchemy
from sqlalchemy import nonexistent_submodule
except ModuleNotFoundError as e:
print(f"Error name: {e.name}")
except ImportError as e:
print(f"Got ImportError: {e}")
# Test 4: Verify that nested imports raise ModuleNotFoundError when the top-level module doesn't exist
print(f"\n--- Test 4: Checking if nested missing imports raise ModuleNotFoundError ---")
try:
from nonexistent.nested.module import something
except ModuleNotFoundError as e:
print(f"Caught ModuleNotFoundError with name='{e.name}'")
except Exception as e:
print(f"Caught {type(e).__name__}: {e}")
EOFRepository: pypatterns/python-cqrs Length of output: 427 🏁 Script executed: # Check for other ImportError patterns in this file
rg "except.*Import" src/cqrs/outbox/sqlalchemy.py -A 3Repository: pypatterns/python-cqrs Length of output: 221 🏁 Script executed: # Also check other files in the outbox directory for similar patterns
find src/cqrs/outbox -name "*.py" -type f | head -20Repository: pypatterns/python-cqrs Length of output: 200 🏁 Script executed: # Check for ImportError patterns in all outbox files
rg "except.*Import" src/cqrs/outbox/ -A 5Repository: pypatterns/python-cqrs Length of output: 489 🏁 Script executed: # Test what happens when a submodule within an existing package has an import error
python3 << 'EOF'
# Simulate what happens when sqlalchemy exists but one of its submodules fails
# We'll create a mock scenario
# Test 1: What type of error when trying to import from a missing submodule of an existing package?
import sys
import types
# Create a fake sqlalchemy module that exists but one of its imports fails internally
fake_sqlalchemy = types.ModuleType('fake_sqlalchemy_test')
# Create a submodule that will fail on import
def failing_import():
raise ImportError("Internal import failed within sqlalchemy")
# This simulates a scenario where sqlalchemy itself imports but a submodule fails
try:
# Simulate the error that would occur
raise ImportError("Some internal sqlalchemy import issue")
except Exception as e:
print(f"Internal import error type: {type(e).__name__}")
print(f"Has 'name' attribute: {hasattr(e, 'name')}")
if hasattr(e, 'name'):
print(f"name value: {e.name}")
# The key insight: internal import failures may raise ImportError (not ModuleNotFoundError)
# if they're caused by issues within the module, not because the module doesn't exist
print("\n--- Key difference ---")
print("ModuleNotFoundError: raised when a module cannot be found")
print("ImportError: can be raised for other import-related issues (syntax errors, circular imports, etc.)")
print("The proposed fix may NOT catch all internal import failures")
EOFRepository: pypatterns/python-cqrs Length of output: 390 🏁 Script executed: # Test nested import scenarios more realistically
python3 << 'EOF'
# Test scenario: what happens with nested submodule imports
# Scenario 1: Top-level module missing
print("=== Scenario 1: Top-level module missing ===")
try:
import nonexistent_package_xyz
except ModuleNotFoundError as e:
print(f"Exception: {type(e).__name__}")
print(f"name: {e.name}")
# Scenario 2: Simulate nested module import attempt
# When you do: from sqlalchemy import func
# If sqlalchemy doesn't exist, what happens?
print("\n=== Scenario 2: What happens with from X import Y when X is missing ===")
try:
from nonexistent_package import something
except ModuleNotFoundError as e:
print(f"Exception: {type(e).__name__}")
print(f"name: {e.name}")
print(f"The 'name' attribute correctly identifies: {e.name}")
# Scenario 3: Multiple imports - what if second one fails?
print("\n=== Scenario 3: Multiple imports where first succeeds, second fails ===")
try:
import sys # this exists
from nonexistent_module import something # this doesn't
except ModuleNotFoundError as e:
print(f"Exception: {type(e).__name__}")
print(f"name: {e.name}")
# The key insight: The proposed fix handles all these correctly
# because ModuleNotFoundError for any missing module will have name set
# and the check `if exc.name != "sqlalchemy"` will re-raise non-sqlalchemy errors
print("\n=== Analysis ===")
print("Proposed fix catches: ModuleNotFoundError with name='sqlalchemy'")
print("Proposed fix re-raises: ModuleNotFoundError with name!='sqlalchemy'")
print("Proposed fix does NOT catch: other ImportError subclasses (AttributeError, etc.)")
EOFRepository: pypatterns/python-cqrs Length of output: 725 🏁 Script executed: # Check all outbox files for similar import error handling patterns
for file in src/cqrs/outbox/*.py; do
echo "=== $file ==="
rg "except.*Import" "$file" -A 3 || echo "No ImportError patterns found"
doneRepository: pypatterns/python-cqrs Length of output: 516 🏁 Script executed: # Also check for similar patterns elsewhere in the codebase
rg "except ImportError" src/ --include="*.py" | head -20Repository: pypatterns/python-cqrs Length of output: 507 🏁 Script executed: # Search for similar ImportError patterns in the codebase
rg "except ImportError" src/ -A 3 | head -30Repository: pypatterns/python-cqrs Length of output: 1571 🏁 Script executed: # Examine the sqlalchemy.py and saga storage sqlalchemy.py patterns more closely
echo "=== Outbox sqlalchemy.py ===" && sed -n '12,24p' src/cqrs/outbox/sqlalchemy.py
echo -e "\n=== Saga storage sqlalchemy.py ===" && rg "except ImportError" src/cqrs/saga/storage/sqlalchemy.py -B 10 -A 6Repository: pypatterns/python-cqrs Length of output: 1274 🏁 Script executed: # Final verification: test the proposed fix logic more thoroughly
python3 << 'EOF'
# Test the proposed fix logic comprehensively
print("=== Testing proposed fix logic ===\n")
# Simulating the proposed fix behavior
def test_proposed_fix(exception_to_raise):
"""Simulates the proposed fix"""
try:
if exception_to_raise == "missing_sqlalchemy":
raise ModuleNotFoundError("No module named 'sqlalchemy'", name='sqlalchemy')
elif exception_to_raise == "missing_submodule":
raise ModuleNotFoundError("No module named 'sqlalchemy.orm'", name='sqlalchemy')
elif exception_to_raise == "missing_other":
raise ModuleNotFoundError("No module named 'other_package'", name='other_package')
except ModuleNotFoundError as exc:
if exc.name != "sqlalchemy":
print(f"✓ Re-raised: {type(exc).__name__} for {exc.name}")
raise
print(f"✓ Caught: {type(exc).__name__} for {exc.name}")
return "custom message"
test_cases = [
("missing_sqlalchemy", "Should catch and show custom message"),
("missing_submodule", "Should catch (name=sqlalchemy)"),
("missing_other", "Should re-raise"),
]
for case, expected in test_cases:
print(f"Case: {case} - {expected}")
try:
result = test_proposed_fix(case)
if result:
print(f" Result: Custom message shown\n")
except ModuleNotFoundError as e:
print(f" Result: Re-raised ModuleNotFoundError\n")
print("=== Conclusion ===")
print("The proposed fix correctly:")
print("1. Catches sqlalchemy ModuleNotFoundError (name='sqlalchemy')")
print("2. Re-raises ModuleNotFoundError for other packages (name!='sqlalchemy')")
print("3. Narrows exception scope from all ImportError to just ModuleNotFoundError")
EOFRepository: pypatterns/python-cqrs Length of output: 764 Use a targeted missing-package check instead of broad This block catches all Proposed fix try:
import sqlalchemy
from sqlalchemy import func
from sqlalchemy.orm import Mapped, mapped_column, DeclarativeMeta, registry
from sqlalchemy.ext.asyncio import session as sql_session
from sqlalchemy.dialects import postgresql
-except ImportError:
- raise ImportError(
+except ModuleNotFoundError as exc:
+ if exc.name != "sqlalchemy":
+ raise
+ raise ImportError(
"You are trying to use SQLAlchemy outbox implementation, "
"but 'sqlalchemy' is not installed. "
"Please install it using: pip install python-cqrs[sqlalchemy]",
) from None🤖 Prompt for AI Agents |
||
|
|
||
| Base = registry().generate_base() | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -24,6 +35,39 @@ | |
| MAX_FLUSH_COUNTER_VALUE = 5 | ||
|
|
||
|
|
||
| class BinaryUUID(sqlalchemy.TypeDecorator): | ||
| """Stores the UUID as a native UUID in Postgres and as BINARY(16) in other databases (MySQL).""" | ||
|
|
||
| impl = sqlalchemy.BINARY(16) | ||
| cache_ok = True | ||
|
|
||
| def load_dialect_impl(self, dialect): | ||
| if dialect.name == "postgresql": | ||
| return dialect.type_descriptor(postgresql.UUID()) | ||
| else: | ||
| return dialect.type_descriptor(sqlalchemy.BINARY(16)) | ||
|
|
||
| def process_bind_param(self, value, dialect): | ||
| if value is None: | ||
| return value | ||
| if isinstance(value, str): | ||
| value = uuid.UUID(value) | ||
| if dialect.name == "postgresql": | ||
| return value # asyncpg works with uuid.UUID | ||
| if isinstance(value, uuid.UUID): | ||
| return value.bytes # For MySQL return 16 bytes | ||
| return value | ||
|
|
||
| def process_result_value(self, value, dialect): | ||
| if value is None: | ||
| return value | ||
| if dialect.name == "postgresql": | ||
| return value # asyncpg return uuid.UUID | ||
| if isinstance(value, bytes): | ||
| return uuid.UUID(bytes=value) # From MySQL got bytes, make UUID | ||
| return value | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| class OutboxModel(Base): | ||
| __tablename__ = DEFAULT_OUTBOX_TABLE_NAME | ||
|
|
||
|
|
@@ -34,57 +78,56 @@ class OutboxModel(Base): | |
| name="event_id_unique_index", | ||
| ), | ||
| ) | ||
| id = sqlalchemy.Column( | ||
| sqlalchemy.BigInteger(), | ||
| id: Mapped[int] = mapped_column( | ||
| sqlalchemy.BigInteger, | ||
| sqlalchemy.Identity(), | ||
| primary_key=True, | ||
| nullable=False, | ||
| autoincrement=True, | ||
| comment="Identity", | ||
| ) | ||
| event_id = sqlalchemy.Column( | ||
| sqlalchemy.Uuid, | ||
| event_id: Mapped[uuid.UUID] = mapped_column( | ||
| BinaryUUID, | ||
| nullable=False, | ||
| comment="Event idempotency id", | ||
| ) | ||
| event_id_bin = sqlalchemy.Column( | ||
| event_id_bin: Mapped[bytes] = mapped_column( | ||
| sqlalchemy.BINARY(16), | ||
| nullable=False, | ||
| comment="Event idempotency id in 16 bit presentation", | ||
| ) | ||
| event_status = sqlalchemy.Column( | ||
| event_status: Mapped[repository.EventStatus] = mapped_column( | ||
| sqlalchemy.Enum(repository.EventStatus), | ||
| nullable=False, | ||
| default=repository.EventStatus.NEW, | ||
| comment="Event producing status", | ||
| ) | ||
| flush_counter = sqlalchemy.Column( | ||
| sqlalchemy.SmallInteger(), | ||
| flush_counter: Mapped[int] = mapped_column( | ||
| sqlalchemy.SmallInteger, | ||
| nullable=False, | ||
| default=0, | ||
| comment="Event producing flush counter", | ||
| ) | ||
| event_name = sqlalchemy.Column( | ||
| event_name: Mapped[typing.Text] = mapped_column( | ||
| sqlalchemy.String(255), | ||
| nullable=False, | ||
| comment="Event name", | ||
| ) | ||
| topic = sqlalchemy.Column( | ||
| topic: Mapped[typing.Text] = mapped_column( | ||
| sqlalchemy.String(255), | ||
| nullable=False, | ||
| comment="Event topic", | ||
| default="", | ||
| ) | ||
| created_at = sqlalchemy.Column( | ||
| created_at: Mapped[datetime.datetime] = mapped_column( | ||
| sqlalchemy.DateTime, | ||
| nullable=False, | ||
| server_default=func.now(), | ||
| comment="Event creation timestamp", | ||
| ) | ||
| payload = sqlalchemy.Column( | ||
| mysql.BLOB, | ||
| payload: Mapped[bytes] = mapped_column( | ||
| sqlalchemy.LargeBinary, | ||
| nullable=False, | ||
| default={}, | ||
| comment="Event payload", | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ) | ||
|
|
||
|
|
@@ -174,7 +217,7 @@ def add( | |
| self.session.add( | ||
| OutboxModel( | ||
| event_id=event.event_id, | ||
| event_id_bin=func.UUID_TO_BIN(event.event_id), | ||
| event_id_bin=event.event_id.bytes, | ||
| event_name=event.event_name, | ||
| created_at=event.event_timestamp, | ||
| payload=bytes_payload, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,17 +6,24 @@ | |
| import uuid | ||
|
|
||
| import dotenv | ||
| import sqlalchemy | ||
| from sqlalchemy import func | ||
| from sqlalchemy.exc import SQLAlchemyError | ||
| from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker | ||
| from sqlalchemy.orm import registry | ||
|
|
||
| from cqrs.dispatcher.exceptions import SagaConcurrencyError | ||
| from cqrs.saga.storage.enums import SagaStatus, SagaStepStatus | ||
| from cqrs.saga.storage.models import SagaLogEntry | ||
| from cqrs.saga.storage.protocol import ISagaStorage, SagaStorageRun | ||
|
|
||
| try: | ||
| import sqlalchemy | ||
| from sqlalchemy import func | ||
| from sqlalchemy.exc import SQLAlchemyError | ||
| from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker | ||
| from sqlalchemy.orm import registry | ||
| except ImportError: | ||
| raise ImportError( | ||
| "You are trying to use SQLAlchemy saga storage implementation, " | ||
| "but 'sqlalchemy' is not installed. " | ||
| "Please install it using: pip install python-cqrs[sqlalchemy]", | ||
| ) from None | ||
|
Comment on lines
+14
to
+25
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: cat -n src/cqrs/saga/storage/sqlalchemy.py | head -30Repository: pypatterns/python-cqrs Length of output: 1179 🌐 Web query:
💡 Result:
|
||
|
|
||
| Base = registry().generate_base() | ||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.