Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ PyMongoSQL implements the DB API 2.0 interfaces to provide SQL-like access to Mo
- **JMESPath** (JSON/Dict Path Query)
- jmespath >= 1.0.0

- **Tenacity** (Transient Failure Retry)
- tenacity >= 9.0.0

### Optional Dependencies

- **SQLAlchemy** (for ORM/Core support)
Expand Down Expand Up @@ -206,6 +209,22 @@ cursor.execute(

Parameters are substituted into the MongoDB filter during execution, providing protection against injection attacks.

### Retry on Transient System Errors

PyMongoSQL supports retrying transient, system-level MongoDB failures (for example connection timeout and reconnect errors) using Tenacity.

```python
connection = connect(
host="mongodb://localhost:27017/database",
retry_enabled=True, # default: True
retry_attempts=3, # default: 3
retry_wait_min=0.1, # default: 0.1 seconds
retry_wait_max=1.0, # default: 1.0 seconds
)
```

These options apply to connection ping checks, query/DML command execution, and paginated `getMore` fetches.

## Supported SQL Features

### SELECT Statements
Expand Down
111 changes: 110 additions & 1 deletion pymongosql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
if TYPE_CHECKING:
from .connection import Connection

__version__: str = "0.4.3"
__version__: str = "0.4.4"

# Globals https://www.python.org/dev/peps/pep-0249/#globals
apilevel: str = "2.0"
Expand Down Expand Up @@ -36,6 +36,115 @@ def __hash__(self):
return frozenset.__hash__(self)


# DB API 2.0 Type Objects for MongoDB Data Types
# https://www.python.org/dev/peps/pep-0249/#type-objects-and-constructors
# Mapping of MongoDB BSON types to DB API 2.0 type objects

# Null/None type
NULL = DBAPITypeObject(("null", "Null", "NULL"))

# String types
STRING = DBAPITypeObject(("string", "str", "String", "VARCHAR", "CHAR", "TEXT"))

# Numeric types - Integer
BINARY = DBAPITypeObject(("binary", "Binary", "BINARY", "VARBINARY", "BLOB", "ObjectId"))

# Numeric types - Integer
NUMBER = DBAPITypeObject(("int", "integer", "long", "int32", "int64", "Integer", "BIGINT", "INT"))

# Numeric types - Decimal/Float
FLOAT = DBAPITypeObject(("double", "decimal", "float", "Double", "DECIMAL", "FLOAT", "NUMERIC"))

# Boolean type
BOOLEAN = DBAPITypeObject(("bool", "boolean", "Bool", "BOOLEAN"))

# Date/Time types
DATE = DBAPITypeObject(("date", "Date", "DATE"))
TIME = DBAPITypeObject(("time", "Time", "TIME"))
DATETIME = DBAPITypeObject(("datetime", "timestamp", "Timestamp", "DATETIME", "TIMESTAMP"))

# Aggregate types
ARRAY = DBAPITypeObject(("array", "Array", "ARRAY", "list"))
OBJECT = DBAPITypeObject(("object", "Object", "OBJECT", "struct", "dict", "document"))

# Special MongoDB types
OBJECTID = DBAPITypeObject(("objectid", "ObjectId", "OBJECTID", "oid"))
REGEX = DBAPITypeObject(("regex", "Regex", "REGEX", "regexp"))

# Map MongoDB BSON type codes to DB API type objects
# This mapping helps cursor.description identify the correct type for each column
_MONGODB_TYPE_MAP = {
"null": NULL,
"string": STRING,
"int": NUMBER,
"integer": NUMBER,
"long": NUMBER,
"int32": NUMBER,
"int64": NUMBER,
"double": FLOAT,
"decimal": FLOAT,
"float": FLOAT,
"bool": BOOLEAN,
"boolean": BOOLEAN,
"date": DATE,
"datetime": DATETIME,
"timestamp": DATETIME,
"array": ARRAY,
"object": OBJECT,
"document": OBJECT,
"bson.objectid": OBJECTID,
"objectid": OBJECTID,
"regex": REGEX,
"binary": BINARY,
}


def get_type_code(value: object) -> str:
"""Get the type code for a MongoDB value.

Maps a MongoDB/Python value to its corresponding DB API type code string.

Args:
value: The value to determine the type for

Returns:
A string representing the DB API type code
"""
if value is None:
return "null"
elif isinstance(value, bool):
return "bool"
elif isinstance(value, int):
return "int"
elif isinstance(value, float):
return "double"
elif isinstance(value, str):
return "string"
elif isinstance(value, bytes):
return "binary"
elif isinstance(value, dict):
return "object"
elif isinstance(value, list):
return "array"
elif hasattr(value, "__class__") and value.__class__.__name__ == "ObjectId":
return "objectid"
else:
return "object"


def get_type_object(value: object) -> DBAPITypeObject:
"""Get the DB API type object for a MongoDB value.

Args:
value: The value to get type information for

Returns:
A DBAPITypeObject representing the value's type
"""
type_code = get_type_code(value)
return _MONGODB_TYPE_MAP.get(type_code, OBJECT)


def connect(*args, **kwargs) -> "Connection":
from .connection import Connection

Expand Down
52 changes: 45 additions & 7 deletions pymongosql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .cursor import Cursor
from .error import DatabaseError, OperationalError
from .helper import ConnectionHelper
from .retry import RetryConfig, execute_with_retry

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,6 +56,10 @@ def __init__(
if not self._mode and mode:
self._mode = mode

# Retry behavior for transient system-level PyMongo failures.
# These kwargs are consumed by PyMongoSQL and are not passed to MongoClient.
self._retry_config = RetryConfig.from_kwargs(kwargs)

# Extract commonly used parameters for backward compatibility
self._host = host or "localhost"
self._port = port or 27017
Expand Down Expand Up @@ -109,7 +114,11 @@ def _connect(self) -> None:
self._client = MongoClient(**self._pymongo_params)

# Test connection
self._client.admin.command("ping")
execute_with_retry(
lambda: self._client.admin.command("ping"),
self._retry_config,
"initial MongoDB ping",
)

# Initialize the database according to explicit parameter or client's default
# This may raise OperationalError if no database could be determined; allow it to bubble up
Expand Down Expand Up @@ -179,6 +188,11 @@ def mode(self) -> str:
"""Get the specified mode"""
return self._mode

@property
def retry_config(self) -> RetryConfig:
"""Get retry configuration used for transient system-level errors."""
return self._retry_config

def use_database(self, database_name: str) -> None:
"""Switch to a different database"""
if self._client is None:
Expand Down Expand Up @@ -332,15 +346,23 @@ def _start_session(self, **kwargs) -> ClientSession:
if self._client is None:
raise OperationalError("No active connection")

session = self._client.start_session(**kwargs)
session = execute_with_retry(
lambda: self._client.start_session(**kwargs),
self._retry_config,
"start MongoDB session",
)
self._session = session
_logger.info("Started new MongoDB session")
return session

def _end_session(self) -> None:
"""End the current session (internal method)"""
if self._session is not None:
self._session.end_session()
execute_with_retry(
lambda: self._session.end_session(),
self._retry_config,
"end MongoDB session",
)
self._session = None
_logger.info("Ended MongoDB session")

Expand All @@ -357,7 +379,11 @@ def _start_transaction(self, **kwargs) -> None:
if self._session is None:
raise OperationalError("No active session")

self._session.start_transaction(**kwargs)
execute_with_retry(
lambda: self._session.start_transaction(**kwargs),
self._retry_config,
"start MongoDB transaction",
)
self._in_transaction = True
self._autocommit = False
_logger.info("Started MongoDB transaction")
Expand All @@ -370,7 +396,11 @@ def _commit_transaction(self) -> None:
if not self._session.in_transaction:
raise OperationalError("No active transaction to commit")

self._session.commit_transaction()
execute_with_retry(
lambda: self._session.commit_transaction(),
self._retry_config,
"commit MongoDB transaction",
)
self._in_transaction = False
self._autocommit = True
_logger.info("Committed MongoDB transaction")
Expand All @@ -383,7 +413,11 @@ def _abort_transaction(self) -> None:
if not self._session.in_transaction:
raise OperationalError("No active transaction to abort")

self._session.abort_transaction()
execute_with_retry(
lambda: self._session.abort_transaction(),
self._retry_config,
"abort MongoDB transaction",
)
self._in_transaction = False
self._autocommit = True
_logger.info("Aborted MongoDB transaction")
Expand Down Expand Up @@ -460,7 +494,11 @@ def test_connection(self) -> bool:
"""Test if the connection is alive"""
try:
if self._client:
self._client.admin.command("ping")
execute_with_retry(
lambda: self._client.admin.command("ping"),
self._retry_config,
"connection health check ping",
)
return True
return False
except Exception as e:
Expand Down
2 changes: 2 additions & 0 deletions pymongosql/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def execute(self: _T, operation: str, parameters: Optional[Any] = None) -> _T:
command_result=result,
execution_plan=execution_plan_for_rs,
database=self.connection.database,
retry_config=self.connection.retry_config,
**self._kwargs,
)
else:
Expand All @@ -125,6 +126,7 @@ def execute(self: _T, operation: str, parameters: Optional[Any] = None) -> _T:
},
execution_plan=stub_plan,
database=self.connection.database,
retry_config=self.connection.retry_config,
**self._kwargs,
)
# Store the actual insert result for reference
Expand Down
Loading
Loading