From cf7a1aaa2f183ac4dddbe3b076f0c71ec55ea7c0 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Tue, 19 Aug 2025 12:56:02 -0700 Subject: [PATCH 1/8] PYTHON-5504 Prototype exponential backoff in with_transaction (#2492) --- pymongo/asynchronous/client_session.py | 10 ++++++++++ pymongo/synchronous/client_session.py | 9 +++++++++ 2 files changed, 19 insertions(+) diff --git a/pymongo/asynchronous/client_session.py b/pymongo/asynchronous/client_session.py index c30fc6679f..4bb927d995 100644 --- a/pymongo/asynchronous/client_session.py +++ b/pymongo/asynchronous/client_session.py @@ -135,7 +135,9 @@ from __future__ import annotations +import asyncio import collections +import random import time import uuid from collections.abc import Mapping as _Mapping @@ -471,6 +473,8 @@ def _max_time_expired_error(exc: PyMongoError) -> bool: # This limit is non-configurable and was chosen to be twice the 60 second # default value of MongoDB's `transactionLifetimeLimitSeconds` parameter. _WITH_TRANSACTION_RETRY_TIME_LIMIT = 120 +_BACKOFF_MAX = 1 +_BACKOFF_INITIAL = 0.050 # 50ms initial backoff def _within_time_limit(start_time: float) -> bool: @@ -700,7 +704,13 @@ async def callback(session, custom_arg, custom_kwarg=None): https://github.com/mongodb/specifications/blob/master/source/transactions-convenient-api/transactions-convenient-api.md#handling-errors-inside-the-callback """ start_time = time.monotonic() + retry = 0 while True: + if retry: # Implement exponential backoff on retry. + jitter = random.random() # noqa: S311 + backoff = jitter * min(_BACKOFF_INITIAL * (2**retry), _BACKOFF_MAX) + await asyncio.sleep(backoff) + retry += 1 await self.start_transaction( read_concern, write_concern, read_preference, max_commit_time_ms ) diff --git a/pymongo/synchronous/client_session.py b/pymongo/synchronous/client_session.py index 68a01dd7e7..a8f03fac74 100644 --- a/pymongo/synchronous/client_session.py +++ b/pymongo/synchronous/client_session.py @@ -136,6 +136,7 @@ from __future__ import annotations import collections +import random import time import uuid from collections.abc import Mapping as _Mapping @@ -470,6 +471,8 @@ def _max_time_expired_error(exc: PyMongoError) -> bool: # This limit is non-configurable and was chosen to be twice the 60 second # default value of MongoDB's `transactionLifetimeLimitSeconds` parameter. _WITH_TRANSACTION_RETRY_TIME_LIMIT = 120 +_BACKOFF_MAX = 1 +_BACKOFF_INITIAL = 0.050 # 50ms initial backoff def _within_time_limit(start_time: float) -> bool: @@ -699,7 +702,13 @@ def callback(session, custom_arg, custom_kwarg=None): https://github.com/mongodb/specifications/blob/master/source/transactions-convenient-api/transactions-convenient-api.md#handling-errors-inside-the-callback """ start_time = time.monotonic() + retry = 0 while True: + if retry: # Implement exponential backoff on retry. + jitter = random.random() # noqa: S311 + backoff = jitter * min(_BACKOFF_INITIAL * (2**retry), _BACKOFF_MAX) + time.sleep(backoff) + retry += 1 self.start_transaction(read_concern, write_concern, read_preference, max_commit_time_ms) try: ret = callback(self) From 75eee91818dfe83257e9467c04103529ccdf26ee Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Thu, 21 Aug 2025 09:46:15 -0700 Subject: [PATCH 2/8] PYTHON-5505 Prototype system overload retry loop for all operations (#2497) All commands that fail with the "Retryable" error label will be retried up to 3 times. When the error includes the "SystemOverloaded" error label we apply exponential backoff with jitter before attempting a retry. --- pymongo/asynchronous/collection.py | 5 +- pymongo/asynchronous/database.py | 5 + pymongo/asynchronous/helpers.py | 41 +++++++ pymongo/asynchronous/mongo_client.py | 56 ++++++--- pymongo/synchronous/collection.py | 5 +- pymongo/synchronous/database.py | 5 + pymongo/synchronous/helpers.py | 41 +++++++ pymongo/synchronous/mongo_client.py | 56 ++++++--- test/asynchronous/test_backpressure.py | 155 +++++++++++++++++++++++++ test/test_backpressure.py | 155 +++++++++++++++++++++++++ tools/synchro.py | 1 + 11 files changed, 491 insertions(+), 34 deletions(-) create mode 100644 test/asynchronous/test_backpressure.py create mode 100644 test/test_backpressure.py diff --git a/pymongo/asynchronous/collection.py b/pymongo/asynchronous/collection.py index 741c11e551..dead0ed4dc 100644 --- a/pymongo/asynchronous/collection.py +++ b/pymongo/asynchronous/collection.py @@ -58,6 +58,7 @@ AsyncCursor, AsyncRawBatchCursor, ) +from pymongo.asynchronous.helpers import _retry_overload from pymongo.collation import validate_collation_or_none from pymongo.common import _ecoc_coll_name, _esc_coll_name from pymongo.errors import ( @@ -2227,6 +2228,7 @@ async def create_indexes( return await self._create_indexes(indexes, session, **kwargs) @_csot.apply + @_retry_overload async def _create_indexes( self, indexes: Sequence[IndexModel], session: Optional[AsyncClientSession], **kwargs: Any ) -> list[str]: @@ -2422,7 +2424,6 @@ async def drop_indexes( kwargs["comment"] = comment await self._drop_index("*", session=session, **kwargs) - @_csot.apply async def drop_index( self, index_or_name: _IndexKeyHint, @@ -2472,6 +2473,7 @@ async def drop_index( await self._drop_index(index_or_name, session, comment, **kwargs) @_csot.apply + @_retry_overload async def _drop_index( self, index_or_name: _IndexKeyHint, @@ -3079,6 +3081,7 @@ async def aggregate_raw_batches( ) @_csot.apply + @_retry_overload async def rename( self, new_name: str, diff --git a/pymongo/asynchronous/database.py b/pymongo/asynchronous/database.py index f70c2b403f..f3b35a0dcb 100644 --- a/pymongo/asynchronous/database.py +++ b/pymongo/asynchronous/database.py @@ -38,6 +38,7 @@ from pymongo.asynchronous.change_stream import AsyncDatabaseChangeStream from pymongo.asynchronous.collection import AsyncCollection from pymongo.asynchronous.command_cursor import AsyncCommandCursor +from pymongo.asynchronous.helpers import _retry_overload from pymongo.common import _ecoc_coll_name, _esc_coll_name from pymongo.database_shared import _check_name, _CodecDocumentType from pymongo.errors import CollectionInvalid, InvalidOperation @@ -477,6 +478,7 @@ async def watch( return change_stream @_csot.apply + @_retry_overload async def create_collection( self, name: str, @@ -816,6 +818,7 @@ async def command( ... @_csot.apply + @_retry_overload async def command( self, command: Union[str, MutableMapping[str, Any]], @@ -947,6 +950,7 @@ async def command( ) @_csot.apply + @_retry_overload async def cursor_command( self, command: Union[str, MutableMapping[str, Any]], @@ -1264,6 +1268,7 @@ async def _drop_helper( ) @_csot.apply + @_retry_overload async def drop_collection( self, name_or_collection: Union[str, AsyncCollection[_DocumentTypeArg]], diff --git a/pymongo/asynchronous/helpers.py b/pymongo/asynchronous/helpers.py index 54fd64f74a..49d5ec604e 100644 --- a/pymongo/asynchronous/helpers.py +++ b/pymongo/asynchronous/helpers.py @@ -17,8 +17,11 @@ import asyncio import builtins +import functools +import random import socket import sys +import time from typing import ( Any, Callable, @@ -28,6 +31,7 @@ from pymongo.errors import ( OperationFailure, + PyMongoError, ) from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE @@ -38,6 +42,7 @@ def _handle_reauth(func: F) -> F: + @functools.wraps(func) async def inner(*args: Any, **kwargs: Any) -> Any: no_reauth = kwargs.pop("no_reauth", False) from pymongo.asynchronous.pool import AsyncConnection @@ -70,6 +75,42 @@ async def inner(*args: Any, **kwargs: Any) -> Any: return cast(F, inner) +_MAX_RETRIES = 3 +_BACKOFF_INITIAL = 0.05 +_BACKOFF_MAX = 10 +_TIME = time + + +async def _backoff( + attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX +) -> None: + jitter = random.random() # noqa: S311 + backoff = jitter * min(initial_delay * (2**attempt), max_delay) + await asyncio.sleep(backoff) + + +def _retry_overload(func: F) -> F: + @functools.wraps(func) + async def inner(*args: Any, **kwargs: Any) -> Any: + attempt = 0 + while True: + try: + return await func(*args, **kwargs) + except PyMongoError as exc: + if not exc.has_error_label("Retryable"): + raise + attempt += 1 + if attempt > _MAX_RETRIES: + raise + + # Implement exponential backoff on retry. + if exc.has_error_label("SystemOverloaded"): + await _backoff(attempt) + continue + + return cast(F, inner) + + async def _getaddrinfo( host: Any, port: Any, **kwargs: Any ) -> list[ diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index b616647791..ae6e819334 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -67,6 +67,7 @@ from pymongo.asynchronous.client_bulk import _AsyncClientBulk from pymongo.asynchronous.client_session import _EmptyServerSession from pymongo.asynchronous.command_cursor import AsyncCommandCursor +from pymongo.asynchronous.helpers import _MAX_RETRIES, _backoff, _retry_overload from pymongo.asynchronous.settings import TopologySettings from pymongo.asynchronous.topology import Topology, _ErrorContext from pymongo.client_options import ClientOptions @@ -2398,6 +2399,7 @@ async def list_database_names( return [doc["name"] async for doc in res] @_csot.apply + @_retry_overload async def drop_database( self, name_or_database: Union[str, database.AsyncDatabase[_DocumentTypeArg]], @@ -2735,6 +2737,7 @@ def __init__( ): self._last_error: Optional[Exception] = None self._retrying = False + self._always_retryable = False self._multiple_retries = _csot.get_timeout() is not None self._client = mongo_client @@ -2783,14 +2786,22 @@ async def run(self) -> T: # most likely be a waste of time. raise except PyMongoError as exc: + always_retryable = False + overloaded = False + exc_to_check = exc # Execute specialized catch on read if self._is_read: if isinstance(exc, (ConnectionFailure, OperationFailure)): # ConnectionFailures do not supply a code property exc_code = getattr(exc, "code", None) - if self._is_not_eligible_for_retry() or ( - isinstance(exc, OperationFailure) - and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES + always_retryable = exc.has_error_label("Retryable") + overloaded = exc.has_error_label("SystemOverloaded") + if not always_retryable and ( + self._is_not_eligible_for_retry() + or ( + isinstance(exc, OperationFailure) + and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES + ) ): raise self._retrying = True @@ -2801,19 +2812,22 @@ async def run(self) -> T: # Specialized catch on write operation if not self._is_read: - if not self._retryable: + if isinstance(exc, ClientBulkWriteException) and isinstance( + exc.error, PyMongoError + ): + exc_to_check = exc.error + retryable_write_label = exc_to_check.has_error_label("RetryableWriteError") + always_retryable = exc_to_check.has_error_label("Retryable") + overloaded = exc_to_check.has_error_label("SystemOverloaded") + if not self._retryable and not always_retryable: raise - if isinstance(exc, ClientBulkWriteException) and exc.error: - retryable_write_error_exc = isinstance( - exc.error, PyMongoError - ) and exc.error.has_error_label("RetryableWriteError") - else: - retryable_write_error_exc = exc.has_error_label("RetryableWriteError") - if retryable_write_error_exc: + if retryable_write_label or always_retryable: assert self._session await self._session._unpin() - if not retryable_write_error_exc or self._is_not_eligible_for_retry(): - if exc.has_error_label("NoWritesPerformed") and self._last_error: + if not always_retryable and ( + not retryable_write_label or self._is_not_eligible_for_retry() + ): + if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error: raise self._last_error from exc else: raise @@ -2822,7 +2836,7 @@ async def run(self) -> T: self._bulk.retrying = True else: self._retrying = True - if not exc.has_error_label("NoWritesPerformed"): + if not exc_to_check.has_error_label("NoWritesPerformed"): self._last_error = exc if self._last_error is None: self._last_error = exc @@ -2830,6 +2844,16 @@ async def run(self) -> T: if self._client.topology_description.topology_type == TOPOLOGY_TYPE.Sharded: self._deprioritized_servers.append(self._server) + self._always_retryable = always_retryable + if always_retryable: + if self._attempt_number > _MAX_RETRIES: + if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error: + raise self._last_error from exc + else: + raise + if overloaded: + await _backoff(self._attempt_number) + def _is_not_eligible_for_retry(self) -> bool: """Checks if the exchange is not eligible for retry""" return not self._retryable or (self._is_retrying() and not self._multiple_retries) @@ -2891,7 +2915,7 @@ async def _write(self) -> T: and conn.supports_sessions ) is_mongos = conn.is_mongos - if not sessions_supported: + if not self._always_retryable and not sessions_supported: # A retry is not possible because this server does # not support sessions raise the last error. self._check_last_error() @@ -2923,7 +2947,7 @@ async def _read(self) -> T: conn, read_pref, ): - if self._retrying and not self._retryable: + if self._retrying and not self._retryable and not self._always_retryable: self._check_last_error() if self._retrying: _debug_log( diff --git a/pymongo/synchronous/collection.py b/pymongo/synchronous/collection.py index 9f32deb765..3df867f7bc 100644 --- a/pymongo/synchronous/collection.py +++ b/pymongo/synchronous/collection.py @@ -89,6 +89,7 @@ Cursor, RawBatchCursor, ) +from pymongo.synchronous.helpers import _retry_overload from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline from pymongo.write_concern import DEFAULT_WRITE_CONCERN, WriteConcern, validate_boolean @@ -2224,6 +2225,7 @@ def create_indexes( return self._create_indexes(indexes, session, **kwargs) @_csot.apply + @_retry_overload def _create_indexes( self, indexes: Sequence[IndexModel], session: Optional[ClientSession], **kwargs: Any ) -> list[str]: @@ -2419,7 +2421,6 @@ def drop_indexes( kwargs["comment"] = comment self._drop_index("*", session=session, **kwargs) - @_csot.apply def drop_index( self, index_or_name: _IndexKeyHint, @@ -2469,6 +2470,7 @@ def drop_index( self._drop_index(index_or_name, session, comment, **kwargs) @_csot.apply + @_retry_overload def _drop_index( self, index_or_name: _IndexKeyHint, @@ -3072,6 +3074,7 @@ def aggregate_raw_batches( ) @_csot.apply + @_retry_overload def rename( self, new_name: str, diff --git a/pymongo/synchronous/database.py b/pymongo/synchronous/database.py index e30f97817c..d8b9ae6a10 100644 --- a/pymongo/synchronous/database.py +++ b/pymongo/synchronous/database.py @@ -43,6 +43,7 @@ from pymongo.synchronous.change_stream import DatabaseChangeStream from pymongo.synchronous.collection import Collection from pymongo.synchronous.command_cursor import CommandCursor +from pymongo.synchronous.helpers import _retry_overload from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline if TYPE_CHECKING: @@ -477,6 +478,7 @@ def watch( return change_stream @_csot.apply + @_retry_overload def create_collection( self, name: str, @@ -816,6 +818,7 @@ def command( ... @_csot.apply + @_retry_overload def command( self, command: Union[str, MutableMapping[str, Any]], @@ -945,6 +948,7 @@ def command( ) @_csot.apply + @_retry_overload def cursor_command( self, command: Union[str, MutableMapping[str, Any]], @@ -1257,6 +1261,7 @@ def _drop_helper( ) @_csot.apply + @_retry_overload def drop_collection( self, name_or_collection: Union[str, Collection[_DocumentTypeArg]], diff --git a/pymongo/synchronous/helpers.py b/pymongo/synchronous/helpers.py index bc69a49e80..889382b19c 100644 --- a/pymongo/synchronous/helpers.py +++ b/pymongo/synchronous/helpers.py @@ -17,8 +17,11 @@ import asyncio import builtins +import functools +import random import socket import sys +import time from typing import ( Any, Callable, @@ -28,6 +31,7 @@ from pymongo.errors import ( OperationFailure, + PyMongoError, ) from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE @@ -38,6 +42,7 @@ def _handle_reauth(func: F) -> F: + @functools.wraps(func) def inner(*args: Any, **kwargs: Any) -> Any: no_reauth = kwargs.pop("no_reauth", False) from pymongo.message import _BulkWriteContext @@ -70,6 +75,42 @@ def inner(*args: Any, **kwargs: Any) -> Any: return cast(F, inner) +_MAX_RETRIES = 3 +_BACKOFF_INITIAL = 0.05 +_BACKOFF_MAX = 10 +_TIME = time + + +def _backoff( + attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX +) -> None: + jitter = random.random() # noqa: S311 + backoff = jitter * min(initial_delay * (2**attempt), max_delay) + time.sleep(backoff) + + +def _retry_overload(func: F) -> F: + @functools.wraps(func) + def inner(*args: Any, **kwargs: Any) -> Any: + attempt = 0 + while True: + try: + return func(*args, **kwargs) + except PyMongoError as exc: + if not exc.has_error_label("Retryable"): + raise + attempt += 1 + if attempt > _MAX_RETRIES: + raise + + # Implement exponential backoff on retry. + if exc.has_error_label("SystemOverloaded"): + _backoff(attempt) + continue + + return cast(F, inner) + + def _getaddrinfo( host: Any, port: Any, **kwargs: Any ) -> list[ diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index ef0663584c..dcd8c50cca 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -110,6 +110,7 @@ from pymongo.synchronous.client_bulk import _ClientBulk from pymongo.synchronous.client_session import _EmptyServerSession from pymongo.synchronous.command_cursor import CommandCursor +from pymongo.synchronous.helpers import _MAX_RETRIES, _backoff, _retry_overload from pymongo.synchronous.settings import TopologySettings from pymongo.synchronous.topology import Topology, _ErrorContext from pymongo.topology_description import TOPOLOGY_TYPE, TopologyDescription @@ -2388,6 +2389,7 @@ def list_database_names( return [doc["name"] for doc in res] @_csot.apply + @_retry_overload def drop_database( self, name_or_database: Union[str, database.Database[_DocumentTypeArg]], @@ -2725,6 +2727,7 @@ def __init__( ): self._last_error: Optional[Exception] = None self._retrying = False + self._always_retryable = False self._multiple_retries = _csot.get_timeout() is not None self._client = mongo_client @@ -2773,14 +2776,22 @@ def run(self) -> T: # most likely be a waste of time. raise except PyMongoError as exc: + always_retryable = False + overloaded = False + exc_to_check = exc # Execute specialized catch on read if self._is_read: if isinstance(exc, (ConnectionFailure, OperationFailure)): # ConnectionFailures do not supply a code property exc_code = getattr(exc, "code", None) - if self._is_not_eligible_for_retry() or ( - isinstance(exc, OperationFailure) - and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES + always_retryable = exc.has_error_label("Retryable") + overloaded = exc.has_error_label("SystemOverloaded") + if not always_retryable and ( + self._is_not_eligible_for_retry() + or ( + isinstance(exc, OperationFailure) + and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES + ) ): raise self._retrying = True @@ -2791,19 +2802,22 @@ def run(self) -> T: # Specialized catch on write operation if not self._is_read: - if not self._retryable: + if isinstance(exc, ClientBulkWriteException) and isinstance( + exc.error, PyMongoError + ): + exc_to_check = exc.error + retryable_write_label = exc_to_check.has_error_label("RetryableWriteError") + always_retryable = exc_to_check.has_error_label("Retryable") + overloaded = exc_to_check.has_error_label("SystemOverloaded") + if not self._retryable and not always_retryable: raise - if isinstance(exc, ClientBulkWriteException) and exc.error: - retryable_write_error_exc = isinstance( - exc.error, PyMongoError - ) and exc.error.has_error_label("RetryableWriteError") - else: - retryable_write_error_exc = exc.has_error_label("RetryableWriteError") - if retryable_write_error_exc: + if retryable_write_label or always_retryable: assert self._session self._session._unpin() - if not retryable_write_error_exc or self._is_not_eligible_for_retry(): - if exc.has_error_label("NoWritesPerformed") and self._last_error: + if not always_retryable and ( + not retryable_write_label or self._is_not_eligible_for_retry() + ): + if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error: raise self._last_error from exc else: raise @@ -2812,7 +2826,7 @@ def run(self) -> T: self._bulk.retrying = True else: self._retrying = True - if not exc.has_error_label("NoWritesPerformed"): + if not exc_to_check.has_error_label("NoWritesPerformed"): self._last_error = exc if self._last_error is None: self._last_error = exc @@ -2820,6 +2834,16 @@ def run(self) -> T: if self._client.topology_description.topology_type == TOPOLOGY_TYPE.Sharded: self._deprioritized_servers.append(self._server) + self._always_retryable = always_retryable + if always_retryable: + if self._attempt_number > _MAX_RETRIES: + if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error: + raise self._last_error from exc + else: + raise + if overloaded: + _backoff(self._attempt_number) + def _is_not_eligible_for_retry(self) -> bool: """Checks if the exchange is not eligible for retry""" return not self._retryable or (self._is_retrying() and not self._multiple_retries) @@ -2881,7 +2905,7 @@ def _write(self) -> T: and conn.supports_sessions ) is_mongos = conn.is_mongos - if not sessions_supported: + if not self._always_retryable and not sessions_supported: # A retry is not possible because this server does # not support sessions raise the last error. self._check_last_error() @@ -2913,7 +2937,7 @@ def _read(self) -> T: conn, read_pref, ): - if self._retrying and not self._retryable: + if self._retrying and not self._retryable and not self._always_retryable: self._check_last_error() if self._retrying: _debug_log( diff --git a/test/asynchronous/test_backpressure.py b/test/asynchronous/test_backpressure.py new file mode 100644 index 0000000000..a9a6fb56f5 --- /dev/null +++ b/test/asynchronous/test_backpressure.py @@ -0,0 +1,155 @@ +# Copyright 2025-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test Client Backpressure spec.""" +from __future__ import annotations + +import sys + +sys.path[0:0] = [""] + +from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest + +from pymongo.asynchronous.helpers import _MAX_RETRIES +from pymongo.errors import PyMongoError + +_IS_SYNC = False + +# Mock an system overload error. +mock_overload_error = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["find", "insert", "update"], + "errorCode": 462, # IngressRequestRateLimitExceeded + "errorLabels": ["Retryable"], + }, +} + + +class TestBackpressure(AsyncIntegrationTest): + RUN_ON_LOAD_BALANCER = True + + @async_client_context.require_failCommand_appName + async def test_retry_overload_error_command(self): + await self.db.t.insert_one({"x": 1}) + + # Ensure command is retried on overload error. + fail_many = mock_overload_error.copy() + fail_many["mode"] = {"times": _MAX_RETRIES} + async with self.fail_point(fail_many): + await self.db.command("find", "t") + + # Ensure command stops retrying after _MAX_RETRIES. + fail_too_many = mock_overload_error.copy() + fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} + async with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + await self.db.command("find", "t") + + self.assertIn("Retryable", str(error.exception)) + + @async_client_context.require_failCommand_appName + async def test_retry_overload_error_find(self): + await self.db.t.insert_one({"x": 1}) + + # Ensure command is retried on overload error. + fail_many = mock_overload_error.copy() + fail_many["mode"] = {"times": _MAX_RETRIES} + async with self.fail_point(fail_many): + await self.db.t.find_one() + + # Ensure command stops retrying after _MAX_RETRIES. + fail_too_many = mock_overload_error.copy() + fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} + async with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + await self.db.t.find_one() + + self.assertIn("Retryable", str(error.exception)) + + @async_client_context.require_failCommand_appName + async def test_retry_overload_error_insert_one(self): + await self.db.t.insert_one({"x": 1}) + + # Ensure command is retried on overload error. + fail_many = mock_overload_error.copy() + fail_many["mode"] = {"times": _MAX_RETRIES} + async with self.fail_point(fail_many): + await self.db.t.find_one() + + # Ensure command stops retrying after _MAX_RETRIES. + fail_too_many = mock_overload_error.copy() + fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} + async with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + await self.db.t.find_one() + + self.assertIn("Retryable", str(error.exception)) + + @async_client_context.require_failCommand_appName + async def test_retry_overload_error_update_many(self): + # Even though update_many is not a retryable write operation, it will + # still be retried via the "Retryable" error label. + await self.db.t.insert_one({"x": 1}) + + # Ensure command is retried on overload error. + fail_many = mock_overload_error.copy() + fail_many["mode"] = {"times": _MAX_RETRIES} + async with self.fail_point(fail_many): + await self.db.t.update_many({}, {"$set": {"x": 2}}) + + # Ensure command stops retrying after _MAX_RETRIES. + fail_too_many = mock_overload_error.copy() + fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} + async with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + await self.db.t.update_many({}, {"$set": {"x": 2}}) + + self.assertIn("Retryable", str(error.exception)) + + @async_client_context.require_failCommand_appName + async def test_retry_overload_error_getMore(self): + coll = self.db.t + await coll.insert_many([{"x": 1} for _ in range(10)]) + + # Ensure command is retried on overload error. + fail_many = { + "configureFailPoint": "failCommand", + "mode": {"times": _MAX_RETRIES}, + "data": { + "failCommands": ["getMore"], + "errorCode": 462, # IngressRequestRateLimitExceeded + "errorLabels": ["Retryable"], + }, + } + cursor = coll.find(batch_size=2) + await cursor.next() + async with self.fail_point(fail_many): + await cursor.to_list() + + # Ensure command stops retrying after _MAX_RETRIES. + fail_too_many = fail_many.copy() + fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} + cursor = coll.find(batch_size=2) + await cursor.next() + async with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + await cursor.to_list() + + self.assertIn("Retryable", str(error.exception)) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_backpressure.py b/test/test_backpressure.py new file mode 100644 index 0000000000..324dd6f15a --- /dev/null +++ b/test/test_backpressure.py @@ -0,0 +1,155 @@ +# Copyright 2025-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test Client Backpressure spec.""" +from __future__ import annotations + +import sys + +sys.path[0:0] = [""] + +from test import IntegrationTest, client_context, unittest + +from pymongo.errors import PyMongoError +from pymongo.synchronous.helpers import _MAX_RETRIES + +_IS_SYNC = True + +# Mock an system overload error. +mock_overload_error = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "failCommands": ["find", "insert", "update"], + "errorCode": 462, # IngressRequestRateLimitExceeded + "errorLabels": ["Retryable"], + }, +} + + +class TestBackpressure(IntegrationTest): + RUN_ON_LOAD_BALANCER = True + + @client_context.require_failCommand_appName + def test_retry_overload_error_command(self): + self.db.t.insert_one({"x": 1}) + + # Ensure command is retried on overload error. + fail_many = mock_overload_error.copy() + fail_many["mode"] = {"times": _MAX_RETRIES} + with self.fail_point(fail_many): + self.db.command("find", "t") + + # Ensure command stops retrying after _MAX_RETRIES. + fail_too_many = mock_overload_error.copy() + fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} + with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + self.db.command("find", "t") + + self.assertIn("Retryable", str(error.exception)) + + @client_context.require_failCommand_appName + def test_retry_overload_error_find(self): + self.db.t.insert_one({"x": 1}) + + # Ensure command is retried on overload error. + fail_many = mock_overload_error.copy() + fail_many["mode"] = {"times": _MAX_RETRIES} + with self.fail_point(fail_many): + self.db.t.find_one() + + # Ensure command stops retrying after _MAX_RETRIES. + fail_too_many = mock_overload_error.copy() + fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} + with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + self.db.t.find_one() + + self.assertIn("Retryable", str(error.exception)) + + @client_context.require_failCommand_appName + def test_retry_overload_error_insert_one(self): + self.db.t.insert_one({"x": 1}) + + # Ensure command is retried on overload error. + fail_many = mock_overload_error.copy() + fail_many["mode"] = {"times": _MAX_RETRIES} + with self.fail_point(fail_many): + self.db.t.find_one() + + # Ensure command stops retrying after _MAX_RETRIES. + fail_too_many = mock_overload_error.copy() + fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} + with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + self.db.t.find_one() + + self.assertIn("Retryable", str(error.exception)) + + @client_context.require_failCommand_appName + def test_retry_overload_error_update_many(self): + # Even though update_many is not a retryable write operation, it will + # still be retried via the "Retryable" error label. + self.db.t.insert_one({"x": 1}) + + # Ensure command is retried on overload error. + fail_many = mock_overload_error.copy() + fail_many["mode"] = {"times": _MAX_RETRIES} + with self.fail_point(fail_many): + self.db.t.update_many({}, {"$set": {"x": 2}}) + + # Ensure command stops retrying after _MAX_RETRIES. + fail_too_many = mock_overload_error.copy() + fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} + with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + self.db.t.update_many({}, {"$set": {"x": 2}}) + + self.assertIn("Retryable", str(error.exception)) + + @client_context.require_failCommand_appName + def test_retry_overload_error_getMore(self): + coll = self.db.t + coll.insert_many([{"x": 1} for _ in range(10)]) + + # Ensure command is retried on overload error. + fail_many = { + "configureFailPoint": "failCommand", + "mode": {"times": _MAX_RETRIES}, + "data": { + "failCommands": ["getMore"], + "errorCode": 462, # IngressRequestRateLimitExceeded + "errorLabels": ["Retryable"], + }, + } + cursor = coll.find(batch_size=2) + cursor.next() + with self.fail_point(fail_many): + cursor.to_list() + + # Ensure command stops retrying after _MAX_RETRIES. + fail_too_many = fail_many.copy() + fail_too_many["mode"] = {"times": _MAX_RETRIES + 1} + cursor = coll.find(batch_size=2) + cursor.next() + with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + cursor.to_list() + + self.assertIn("Retryable", str(error.exception)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/synchro.py b/tools/synchro.py index 9a760c0ad7..44698134cd 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -208,6 +208,7 @@ def async_only_test(f: str) -> bool: "test_auth_oidc.py", "test_auth_spec.py", "test_bulk.py", + "test_backpressure.py", "test_change_stream.py", "test_client.py", "test_client_bulk_write.py", From 875c5640d751eadb637d019e954f1278f18953c5 Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Fri, 22 Aug 2025 14:56:23 -0700 Subject: [PATCH 3/8] PYTHON-5506 Prototype adaptive token bucket retry (#2501) Add adaptive token bucket based retry policy. Successfully completed commands deposit 0.1 token. Failed retry attempts consume 1 token. A retry is only permitted if there is an available token. Token bucket starts full with the maximum 1000 tokens. --- pymongo/asynchronous/collection.py | 1 + pymongo/asynchronous/database.py | 1 + pymongo/asynchronous/helpers.py | 105 ++++++++++++++++++++++--- pymongo/asynchronous/mongo_client.py | 19 +++-- pymongo/synchronous/collection.py | 1 + pymongo/synchronous/database.py | 1 + pymongo/synchronous/helpers.py | 103 +++++++++++++++++++++--- pymongo/synchronous/mongo_client.py | 19 +++-- test/asynchronous/test_backpressure.py | 79 ++++++++++++++++++- test/test_backpressure.py | 79 ++++++++++++++++++- 10 files changed, 373 insertions(+), 35 deletions(-) diff --git a/pymongo/asynchronous/collection.py b/pymongo/asynchronous/collection.py index dead0ed4dc..6ff62e9fe3 100644 --- a/pymongo/asynchronous/collection.py +++ b/pymongo/asynchronous/collection.py @@ -253,6 +253,7 @@ def __init__( unicode_decode_error_handler="replace", document_class=dict ) self._timeout = database.client.options.timeout + self._retry_policy = database.client._retry_policy if create or kwargs: if _IS_SYNC: diff --git a/pymongo/asynchronous/database.py b/pymongo/asynchronous/database.py index f3b35a0dcb..8abc7059d0 100644 --- a/pymongo/asynchronous/database.py +++ b/pymongo/asynchronous/database.py @@ -136,6 +136,7 @@ def __init__( self._name = name self._client: AsyncMongoClient[_DocumentType] = client self._timeout = client.options.timeout + self._retry_policy = client._retry_policy @property def client(self) -> AsyncMongoClient[_DocumentType]: diff --git a/pymongo/asynchronous/helpers.py b/pymongo/asynchronous/helpers.py index 49d5ec604e..6ef3beacf5 100644 --- a/pymongo/asynchronous/helpers.py +++ b/pymongo/asynchronous/helpers.py @@ -21,7 +21,7 @@ import random import socket import sys -import time +import time as time # noqa: PLC0414 # needed in sync version from typing import ( Any, Callable, @@ -29,11 +29,13 @@ cast, ) +from pymongo import _csot from pymongo.errors import ( OperationFailure, PyMongoError, ) from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE +from pymongo.lock import _async_create_lock _IS_SYNC = False @@ -78,34 +80,115 @@ async def inner(*args: Any, **kwargs: Any) -> Any: _MAX_RETRIES = 3 _BACKOFF_INITIAL = 0.05 _BACKOFF_MAX = 10 -_TIME = time +# DRIVERS-3240 will determine these defaults. +DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0 +DEFAULT_RETRY_TOKEN_RETURN = 0.1 -async def _backoff( +def _backoff( attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX -) -> None: +) -> float: jitter = random.random() # noqa: S311 - backoff = jitter * min(initial_delay * (2**attempt), max_delay) - await asyncio.sleep(backoff) + return jitter * min(initial_delay * (2**attempt), max_delay) + + +class _TokenBucket: + """A token bucket implementation for rate limiting.""" + + def __init__( + self, + capacity: float = DEFAULT_RETRY_TOKEN_CAPACITY, + return_rate: float = DEFAULT_RETRY_TOKEN_RETURN, + ): + self.lock = _async_create_lock() + self.capacity = capacity + # DRIVERS-3240 will determine how full the bucket should start. + self.tokens = capacity + self.return_rate = return_rate + + async def consume(self) -> bool: + """Consume a token from the bucket if available.""" + async with self.lock: + if self.tokens >= 1: + self.tokens -= 1 + return True + return False + + async def deposit(self, retry: bool = False) -> None: + """Deposit a token back into the bucket.""" + retry_token = 1 if retry else 0 + async with self.lock: + self.tokens = min(self.capacity, self.tokens + retry_token + self.return_rate) + + +class _RetryPolicy: + """A retry limiter that performs exponential backoff with jitter. + + Retry attempts are limited by a token bucket to prevent overwhelming the server during + a prolonged outage or high load. + """ + + def __init__( + self, + token_bucket: _TokenBucket, + attempts: int = _MAX_RETRIES, + backoff_initial: float = _BACKOFF_INITIAL, + backoff_max: float = _BACKOFF_MAX, + ): + self.token_bucket = token_bucket + self.attempts = attempts + self.backoff_initial = backoff_initial + self.backoff_max = backoff_max + + async def record_success(self, retry: bool) -> None: + """Record a successful operation.""" + await self.token_bucket.deposit(retry) + + def backoff(self, attempt: int) -> float: + """Return the backoff duration for the given .""" + return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max) + + async def should_retry(self, attempt: int, delay: float) -> bool: + """Return if we have budget to retry and how long to backoff.""" + if attempt > self.attempts: + return False + + # If the delay would exceed the deadline, bail early before consuming a token. + if _csot.get_timeout(): + if time.monotonic() + delay > _csot.get_deadline(): + return False + + # Check token bucket last since we only want to consume a token if we actually retry. + if not await self.token_bucket.consume(): + # DRIVERS-3246 Improve diagnostics when this case happens. + # We could add info to the exception and log. + return False + return True def _retry_overload(func: F) -> F: @functools.wraps(func) - async def inner(*args: Any, **kwargs: Any) -> Any: + async def inner(self: Any, *args: Any, **kwargs: Any) -> Any: + retry_policy = self._retry_policy attempt = 0 while True: try: - return await func(*args, **kwargs) + res = await func(self, *args, **kwargs) + await retry_policy.record_success(retry=attempt > 0) + return res except PyMongoError as exc: if not exc.has_error_label("Retryable"): raise attempt += 1 - if attempt > _MAX_RETRIES: + delay = 0 + if exc.has_error_label("SystemOverloaded"): + delay = retry_policy.backoff(attempt) + if not await retry_policy.should_retry(attempt, delay): raise # Implement exponential backoff on retry. - if exc.has_error_label("SystemOverloaded"): - await _backoff(attempt) + if delay: + await asyncio.sleep(delay) continue return cast(F, inner) diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index ae6e819334..d9994e9902 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -35,6 +35,7 @@ import asyncio import contextlib import os +import time as time # noqa: PLC0414 # needed in sync version import warnings import weakref from collections import defaultdict @@ -67,7 +68,11 @@ from pymongo.asynchronous.client_bulk import _AsyncClientBulk from pymongo.asynchronous.client_session import _EmptyServerSession from pymongo.asynchronous.command_cursor import AsyncCommandCursor -from pymongo.asynchronous.helpers import _MAX_RETRIES, _backoff, _retry_overload +from pymongo.asynchronous.helpers import ( + _retry_overload, + _RetryPolicy, + _TokenBucket, +) from pymongo.asynchronous.settings import TopologySettings from pymongo.asynchronous.topology import Topology, _ErrorContext from pymongo.client_options import ClientOptions @@ -774,6 +779,7 @@ def __init__( self._timeout: float | None = None self._topology_settings: TopologySettings = None # type: ignore[assignment] self._event_listeners: _EventListeners | None = None + self._retry_policy = _RetryPolicy(_TokenBucket()) # _pool_class, _monitor_class, and _condition_class are for deep # customization of PyMongo, e.g. Motor. @@ -2740,7 +2746,7 @@ def __init__( self._always_retryable = False self._multiple_retries = _csot.get_timeout() is not None self._client = mongo_client - + self._retry_policy = mongo_client._retry_policy self._func = func self._bulk = bulk self._session = session @@ -2775,7 +2781,9 @@ async def run(self) -> T: while True: self._check_last_error(check_csot=True) try: - return await self._read() if self._is_read else await self._write() + res = await self._read() if self._is_read else await self._write() + await self._retry_policy.record_success(self._attempt_number > 0) + return res except ServerSelectionTimeoutError: # The application may think the write was never attempted # if we raise ServerSelectionTimeoutError on the retry @@ -2846,13 +2854,14 @@ async def run(self) -> T: self._always_retryable = always_retryable if always_retryable: - if self._attempt_number > _MAX_RETRIES: + delay = self._retry_policy.backoff(self._attempt_number) if overloaded else 0 + if not await self._retry_policy.should_retry(self._attempt_number, delay): if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error: raise self._last_error from exc else: raise if overloaded: - await _backoff(self._attempt_number) + await asyncio.sleep(delay) def _is_not_eligible_for_retry(self) -> bool: """Checks if the exchange is not eligible for retry""" diff --git a/pymongo/synchronous/collection.py b/pymongo/synchronous/collection.py index 3df867f7bc..324139d40a 100644 --- a/pymongo/synchronous/collection.py +++ b/pymongo/synchronous/collection.py @@ -256,6 +256,7 @@ def __init__( unicode_decode_error_handler="replace", document_class=dict ) self._timeout = database.client.options.timeout + self._retry_policy = database.client._retry_policy if create or kwargs: if _IS_SYNC: diff --git a/pymongo/synchronous/database.py b/pymongo/synchronous/database.py index d8b9ae6a10..62f8f69067 100644 --- a/pymongo/synchronous/database.py +++ b/pymongo/synchronous/database.py @@ -136,6 +136,7 @@ def __init__( self._name = name self._client: MongoClient[_DocumentType] = client self._timeout = client.options.timeout + self._retry_policy = client._retry_policy @property def client(self) -> MongoClient[_DocumentType]: diff --git a/pymongo/synchronous/helpers.py b/pymongo/synchronous/helpers.py index 889382b19c..0a2cd71062 100644 --- a/pymongo/synchronous/helpers.py +++ b/pymongo/synchronous/helpers.py @@ -21,7 +21,7 @@ import random import socket import sys -import time +import time as time # noqa: PLC0414 # needed in sync version from typing import ( Any, Callable, @@ -29,11 +29,13 @@ cast, ) +from pymongo import _csot from pymongo.errors import ( OperationFailure, PyMongoError, ) from pymongo.helpers_shared import _REAUTHENTICATION_REQUIRED_CODE +from pymongo.lock import _create_lock _IS_SYNC = True @@ -78,34 +80,115 @@ def inner(*args: Any, **kwargs: Any) -> Any: _MAX_RETRIES = 3 _BACKOFF_INITIAL = 0.05 _BACKOFF_MAX = 10 -_TIME = time +# DRIVERS-3240 will determine these defaults. +DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0 +DEFAULT_RETRY_TOKEN_RETURN = 0.1 def _backoff( attempt: int, initial_delay: float = _BACKOFF_INITIAL, max_delay: float = _BACKOFF_MAX -) -> None: +) -> float: jitter = random.random() # noqa: S311 - backoff = jitter * min(initial_delay * (2**attempt), max_delay) - time.sleep(backoff) + return jitter * min(initial_delay * (2**attempt), max_delay) + + +class _TokenBucket: + """A token bucket implementation for rate limiting.""" + + def __init__( + self, + capacity: float = DEFAULT_RETRY_TOKEN_CAPACITY, + return_rate: float = DEFAULT_RETRY_TOKEN_RETURN, + ): + self.lock = _create_lock() + self.capacity = capacity + # DRIVERS-3240 will determine how full the bucket should start. + self.tokens = capacity + self.return_rate = return_rate + + def consume(self) -> bool: + """Consume a token from the bucket if available.""" + with self.lock: + if self.tokens >= 1: + self.tokens -= 1 + return True + return False + + def deposit(self, retry: bool = False) -> None: + """Deposit a token back into the bucket.""" + retry_token = 1 if retry else 0 + with self.lock: + self.tokens = min(self.capacity, self.tokens + retry_token + self.return_rate) + + +class _RetryPolicy: + """A retry limiter that performs exponential backoff with jitter. + + Retry attempts are limited by a token bucket to prevent overwhelming the server during + a prolonged outage or high load. + """ + + def __init__( + self, + token_bucket: _TokenBucket, + attempts: int = _MAX_RETRIES, + backoff_initial: float = _BACKOFF_INITIAL, + backoff_max: float = _BACKOFF_MAX, + ): + self.token_bucket = token_bucket + self.attempts = attempts + self.backoff_initial = backoff_initial + self.backoff_max = backoff_max + + def record_success(self, retry: bool) -> None: + """Record a successful operation.""" + self.token_bucket.deposit(retry) + + def backoff(self, attempt: int) -> float: + """Return the backoff duration for the given .""" + return _backoff(max(0, attempt - 1), self.backoff_initial, self.backoff_max) + + def should_retry(self, attempt: int, delay: float) -> bool: + """Return if we have budget to retry and how long to backoff.""" + if attempt > self.attempts: + return False + + # If the delay would exceed the deadline, bail early before consuming a token. + if _csot.get_timeout(): + if time.monotonic() + delay > _csot.get_deadline(): + return False + + # Check token bucket last since we only want to consume a token if we actually retry. + if not self.token_bucket.consume(): + # DRIVERS-3246 Improve diagnostics when this case happens. + # We could add info to the exception and log. + return False + return True def _retry_overload(func: F) -> F: @functools.wraps(func) - def inner(*args: Any, **kwargs: Any) -> Any: + def inner(self: Any, *args: Any, **kwargs: Any) -> Any: + retry_policy = self._retry_policy attempt = 0 while True: try: - return func(*args, **kwargs) + res = func(self, *args, **kwargs) + retry_policy.record_success(retry=attempt > 0) + return res except PyMongoError as exc: if not exc.has_error_label("Retryable"): raise attempt += 1 - if attempt > _MAX_RETRIES: + delay = 0 + if exc.has_error_label("SystemOverloaded"): + delay = retry_policy.backoff(attempt) + if not retry_policy.should_retry(attempt, delay): raise # Implement exponential backoff on retry. - if exc.has_error_label("SystemOverloaded"): - _backoff(attempt) + if delay: + time.sleep(delay) continue return cast(F, inner) diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index dcd8c50cca..9beda807ef 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -35,6 +35,7 @@ import asyncio import contextlib import os +import time as time # noqa: PLC0414 # needed in sync version import warnings import weakref from collections import defaultdict @@ -110,7 +111,11 @@ from pymongo.synchronous.client_bulk import _ClientBulk from pymongo.synchronous.client_session import _EmptyServerSession from pymongo.synchronous.command_cursor import CommandCursor -from pymongo.synchronous.helpers import _MAX_RETRIES, _backoff, _retry_overload +from pymongo.synchronous.helpers import ( + _retry_overload, + _RetryPolicy, + _TokenBucket, +) from pymongo.synchronous.settings import TopologySettings from pymongo.synchronous.topology import Topology, _ErrorContext from pymongo.topology_description import TOPOLOGY_TYPE, TopologyDescription @@ -774,6 +779,7 @@ def __init__( self._timeout: float | None = None self._topology_settings: TopologySettings = None # type: ignore[assignment] self._event_listeners: _EventListeners | None = None + self._retry_policy = _RetryPolicy(_TokenBucket()) # _pool_class, _monitor_class, and _condition_class are for deep # customization of PyMongo, e.g. Motor. @@ -2730,7 +2736,7 @@ def __init__( self._always_retryable = False self._multiple_retries = _csot.get_timeout() is not None self._client = mongo_client - + self._retry_policy = mongo_client._retry_policy self._func = func self._bulk = bulk self._session = session @@ -2765,7 +2771,9 @@ def run(self) -> T: while True: self._check_last_error(check_csot=True) try: - return self._read() if self._is_read else self._write() + res = self._read() if self._is_read else self._write() + self._retry_policy.record_success(self._attempt_number > 0) + return res except ServerSelectionTimeoutError: # The application may think the write was never attempted # if we raise ServerSelectionTimeoutError on the retry @@ -2836,13 +2844,14 @@ def run(self) -> T: self._always_retryable = always_retryable if always_retryable: - if self._attempt_number > _MAX_RETRIES: + delay = self._retry_policy.backoff(self._attempt_number) if overloaded else 0 + if not self._retry_policy.should_retry(self._attempt_number, delay): if exc_to_check.has_error_label("NoWritesPerformed") and self._last_error: raise self._last_error from exc else: raise if overloaded: - _backoff(self._attempt_number) + time.sleep(delay) def _is_not_eligible_for_retry(self) -> bool: """Checks if the exchange is not eligible for retry""" diff --git a/test/asynchronous/test_backpressure.py b/test/asynchronous/test_backpressure.py index a9a6fb56f5..598236dbfe 100644 --- a/test/asynchronous/test_backpressure.py +++ b/test/asynchronous/test_backpressure.py @@ -15,13 +15,22 @@ """Test Client Backpressure spec.""" from __future__ import annotations +import asyncio import sys +import pymongo + sys.path[0:0] = [""] -from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest +from test.asynchronous import ( + AsyncIntegrationTest, + AsyncPyMongoTestCase, + async_client_context, + unittest, +) -from pymongo.asynchronous.helpers import _MAX_RETRIES +from pymongo.asynchronous import helpers +from pymongo.asynchronous.helpers import _MAX_RETRIES, _RetryPolicy, _TokenBucket from pymongo.errors import PyMongoError _IS_SYNC = False @@ -150,6 +159,72 @@ async def test_retry_overload_error_getMore(self): self.assertIn("Retryable", str(error.exception)) + @async_client_context.require_failCommand_appName + async def test_limit_retry_command(self): + client = await self.async_rs_or_single_client() + client._retry_policy.token_bucket.tokens = 1 + db = client.pymongo_test + await db.t.insert_one({"x": 1}) + + # Ensure command is retried once overload error. + fail_many = mock_overload_error.copy() + fail_many["mode"] = {"times": 1} + async with self.fail_point(fail_many): + await db.command("find", "t") + + # Ensure command stops retrying when there are no tokens left. + fail_too_many = mock_overload_error.copy() + fail_too_many["mode"] = {"times": 2} + async with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + await db.command("find", "t") + + self.assertIn("Retryable", str(error.exception)) + + +class TestRetryPolicy(AsyncPyMongoTestCase): + async def test_retry_policy(self): + capacity = 10 + retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity)) + self.assertEqual(retry_policy.attempts, helpers._MAX_RETRIES) + self.assertEqual(retry_policy.backoff_initial, helpers._BACKOFF_INITIAL) + self.assertEqual(retry_policy.backoff_max, helpers._BACKOFF_MAX) + for i in range(1, helpers._MAX_RETRIES + 1): + self.assertTrue(await retry_policy.should_retry(i, 0)) + self.assertFalse(await retry_policy.should_retry(helpers._MAX_RETRIES + 1, 0)) + for i in range(capacity - helpers._MAX_RETRIES): + self.assertTrue(await retry_policy.should_retry(1, 0)) + # No tokens left, should not retry. + self.assertFalse(await retry_policy.should_retry(1, 0)) + self.assertEqual(retry_policy.token_bucket.tokens, 0) + + # record_success should generate tokens. + for _ in range(int(2 / helpers.DEFAULT_RETRY_TOKEN_RETURN)): + await retry_policy.record_success(retry=False) + self.assertAlmostEqual(retry_policy.token_bucket.tokens, 2) + for i in range(2): + self.assertTrue(await retry_policy.should_retry(1, 0)) + self.assertFalse(await retry_policy.should_retry(1, 0)) + + # Recording a successful retry should return 1 additional token. + await retry_policy.record_success(retry=True) + self.assertAlmostEqual( + retry_policy.token_bucket.tokens, 1 + helpers.DEFAULT_RETRY_TOKEN_RETURN + ) + self.assertTrue(await retry_policy.should_retry(1, 0)) + self.assertFalse(await retry_policy.should_retry(1, 0)) + self.assertAlmostEqual(retry_policy.token_bucket.tokens, helpers.DEFAULT_RETRY_TOKEN_RETURN) + + async def test_retry_policy_csot(self): + retry_policy = _RetryPolicy(_TokenBucket()) + self.assertTrue(await retry_policy.should_retry(1, 0.5)) + with pymongo.timeout(0.5): + self.assertTrue(await retry_policy.should_retry(1, 0)) + self.assertTrue(await retry_policy.should_retry(1, 0.1)) + # Would exceed the timeout, should not retry. + self.assertFalse(await retry_policy.should_retry(1, 1.0)) + self.assertTrue(await retry_policy.should_retry(1, 1.0)) + if __name__ == "__main__": unittest.main() diff --git a/test/test_backpressure.py b/test/test_backpressure.py index 324dd6f15a..182ce424a9 100644 --- a/test/test_backpressure.py +++ b/test/test_backpressure.py @@ -15,14 +15,23 @@ """Test Client Backpressure spec.""" from __future__ import annotations +import asyncio import sys +import pymongo + sys.path[0:0] = [""] -from test import IntegrationTest, client_context, unittest +from test import ( + IntegrationTest, + PyMongoTestCase, + client_context, + unittest, +) from pymongo.errors import PyMongoError -from pymongo.synchronous.helpers import _MAX_RETRIES +from pymongo.synchronous import helpers +from pymongo.synchronous.helpers import _MAX_RETRIES, _RetryPolicy, _TokenBucket _IS_SYNC = True @@ -150,6 +159,72 @@ def test_retry_overload_error_getMore(self): self.assertIn("Retryable", str(error.exception)) + @client_context.require_failCommand_appName + def test_limit_retry_command(self): + client = self.rs_or_single_client() + client._retry_policy.token_bucket.tokens = 1 + db = client.pymongo_test + db.t.insert_one({"x": 1}) + + # Ensure command is retried once overload error. + fail_many = mock_overload_error.copy() + fail_many["mode"] = {"times": 1} + with self.fail_point(fail_many): + db.command("find", "t") + + # Ensure command stops retrying when there are no tokens left. + fail_too_many = mock_overload_error.copy() + fail_too_many["mode"] = {"times": 2} + with self.fail_point(fail_too_many): + with self.assertRaises(PyMongoError) as error: + db.command("find", "t") + + self.assertIn("Retryable", str(error.exception)) + + +class TestRetryPolicy(PyMongoTestCase): + def test_retry_policy(self): + capacity = 10 + retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity)) + self.assertEqual(retry_policy.attempts, helpers._MAX_RETRIES) + self.assertEqual(retry_policy.backoff_initial, helpers._BACKOFF_INITIAL) + self.assertEqual(retry_policy.backoff_max, helpers._BACKOFF_MAX) + for i in range(1, helpers._MAX_RETRIES + 1): + self.assertTrue(retry_policy.should_retry(i, 0)) + self.assertFalse(retry_policy.should_retry(helpers._MAX_RETRIES + 1, 0)) + for i in range(capacity - helpers._MAX_RETRIES): + self.assertTrue(retry_policy.should_retry(1, 0)) + # No tokens left, should not retry. + self.assertFalse(retry_policy.should_retry(1, 0)) + self.assertEqual(retry_policy.token_bucket.tokens, 0) + + # record_success should generate tokens. + for _ in range(int(2 / helpers.DEFAULT_RETRY_TOKEN_RETURN)): + retry_policy.record_success(retry=False) + self.assertAlmostEqual(retry_policy.token_bucket.tokens, 2) + for i in range(2): + self.assertTrue(retry_policy.should_retry(1, 0)) + self.assertFalse(retry_policy.should_retry(1, 0)) + + # Recording a successful retry should return 1 additional token. + retry_policy.record_success(retry=True) + self.assertAlmostEqual( + retry_policy.token_bucket.tokens, 1 + helpers.DEFAULT_RETRY_TOKEN_RETURN + ) + self.assertTrue(retry_policy.should_retry(1, 0)) + self.assertFalse(retry_policy.should_retry(1, 0)) + self.assertAlmostEqual(retry_policy.token_bucket.tokens, helpers.DEFAULT_RETRY_TOKEN_RETURN) + + def test_retry_policy_csot(self): + retry_policy = _RetryPolicy(_TokenBucket()) + self.assertTrue(retry_policy.should_retry(1, 0.5)) + with pymongo.timeout(0.5): + self.assertTrue(retry_policy.should_retry(1, 0)) + self.assertTrue(retry_policy.should_retry(1, 0.1)) + # Would exceed the timeout, should not retry. + self.assertFalse(retry_policy.should_retry(1, 1.0)) + self.assertTrue(retry_policy.should_retry(1, 1.0)) + if __name__ == "__main__": unittest.main() From c458379522a3ad0ff0cb577aea1f9fe22efd1bfe Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Tue, 26 Aug 2025 15:45:06 -0700 Subject: [PATCH 4/8] PYTHON-5505 Use proper RetryableError and SystemOverloadedError labels --- pymongo/asynchronous/helpers.py | 4 ++-- pymongo/asynchronous/mongo_client.py | 8 ++++---- pymongo/synchronous/helpers.py | 4 ++-- pymongo/synchronous/mongo_client.py | 8 ++++---- test/asynchronous/test_backpressure.py | 18 +++++++++--------- test/test_backpressure.py | 18 +++++++++--------- 6 files changed, 30 insertions(+), 30 deletions(-) diff --git a/pymongo/asynchronous/helpers.py b/pymongo/asynchronous/helpers.py index 6ef3beacf5..96241b947c 100644 --- a/pymongo/asynchronous/helpers.py +++ b/pymongo/asynchronous/helpers.py @@ -177,11 +177,11 @@ async def inner(self: Any, *args: Any, **kwargs: Any) -> Any: await retry_policy.record_success(retry=attempt > 0) return res except PyMongoError as exc: - if not exc.has_error_label("Retryable"): + if not exc.has_error_label("RetryableError"): raise attempt += 1 delay = 0 - if exc.has_error_label("SystemOverloaded"): + if exc.has_error_label("SystemOverloadedError"): delay = retry_policy.backoff(attempt) if not await retry_policy.should_retry(attempt, delay): raise diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index d9994e9902..20ed199b20 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -2802,8 +2802,8 @@ async def run(self) -> T: if isinstance(exc, (ConnectionFailure, OperationFailure)): # ConnectionFailures do not supply a code property exc_code = getattr(exc, "code", None) - always_retryable = exc.has_error_label("Retryable") - overloaded = exc.has_error_label("SystemOverloaded") + always_retryable = exc.has_error_label("RetryableError") + overloaded = exc.has_error_label("SystemOverloadedError") if not always_retryable and ( self._is_not_eligible_for_retry() or ( @@ -2825,8 +2825,8 @@ async def run(self) -> T: ): exc_to_check = exc.error retryable_write_label = exc_to_check.has_error_label("RetryableWriteError") - always_retryable = exc_to_check.has_error_label("Retryable") - overloaded = exc_to_check.has_error_label("SystemOverloaded") + always_retryable = exc_to_check.has_error_label("RetryableError") + overloaded = exc_to_check.has_error_label("SystemOverloadedError") if not self._retryable and not always_retryable: raise if retryable_write_label or always_retryable: diff --git a/pymongo/synchronous/helpers.py b/pymongo/synchronous/helpers.py index 0a2cd71062..72d8978796 100644 --- a/pymongo/synchronous/helpers.py +++ b/pymongo/synchronous/helpers.py @@ -177,11 +177,11 @@ def inner(self: Any, *args: Any, **kwargs: Any) -> Any: retry_policy.record_success(retry=attempt > 0) return res except PyMongoError as exc: - if not exc.has_error_label("Retryable"): + if not exc.has_error_label("RetryableError"): raise attempt += 1 delay = 0 - if exc.has_error_label("SystemOverloaded"): + if exc.has_error_label("SystemOverloadedError"): delay = retry_policy.backoff(attempt) if not retry_policy.should_retry(attempt, delay): raise diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 9beda807ef..e4a6003c1c 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -2792,8 +2792,8 @@ def run(self) -> T: if isinstance(exc, (ConnectionFailure, OperationFailure)): # ConnectionFailures do not supply a code property exc_code = getattr(exc, "code", None) - always_retryable = exc.has_error_label("Retryable") - overloaded = exc.has_error_label("SystemOverloaded") + always_retryable = exc.has_error_label("RetryableError") + overloaded = exc.has_error_label("SystemOverloadedError") if not always_retryable and ( self._is_not_eligible_for_retry() or ( @@ -2815,8 +2815,8 @@ def run(self) -> T: ): exc_to_check = exc.error retryable_write_label = exc_to_check.has_error_label("RetryableWriteError") - always_retryable = exc_to_check.has_error_label("Retryable") - overloaded = exc_to_check.has_error_label("SystemOverloaded") + always_retryable = exc_to_check.has_error_label("RetryableError") + overloaded = exc_to_check.has_error_label("SystemOverloadedError") if not self._retryable and not always_retryable: raise if retryable_write_label or always_retryable: diff --git a/test/asynchronous/test_backpressure.py b/test/asynchronous/test_backpressure.py index 598236dbfe..11f8edde67 100644 --- a/test/asynchronous/test_backpressure.py +++ b/test/asynchronous/test_backpressure.py @@ -42,7 +42,7 @@ "data": { "failCommands": ["find", "insert", "update"], "errorCode": 462, # IngressRequestRateLimitExceeded - "errorLabels": ["Retryable"], + "errorLabels": ["RetryableError"], }, } @@ -67,7 +67,7 @@ async def test_retry_overload_error_command(self): with self.assertRaises(PyMongoError) as error: await self.db.command("find", "t") - self.assertIn("Retryable", str(error.exception)) + self.assertIn("RetryableError", str(error.exception)) @async_client_context.require_failCommand_appName async def test_retry_overload_error_find(self): @@ -86,7 +86,7 @@ async def test_retry_overload_error_find(self): with self.assertRaises(PyMongoError) as error: await self.db.t.find_one() - self.assertIn("Retryable", str(error.exception)) + self.assertIn("RetryableError", str(error.exception)) @async_client_context.require_failCommand_appName async def test_retry_overload_error_insert_one(self): @@ -105,12 +105,12 @@ async def test_retry_overload_error_insert_one(self): with self.assertRaises(PyMongoError) as error: await self.db.t.find_one() - self.assertIn("Retryable", str(error.exception)) + self.assertIn("RetryableError", str(error.exception)) @async_client_context.require_failCommand_appName async def test_retry_overload_error_update_many(self): # Even though update_many is not a retryable write operation, it will - # still be retried via the "Retryable" error label. + # still be retried via the "RetryableError" error label. await self.db.t.insert_one({"x": 1}) # Ensure command is retried on overload error. @@ -126,7 +126,7 @@ async def test_retry_overload_error_update_many(self): with self.assertRaises(PyMongoError) as error: await self.db.t.update_many({}, {"$set": {"x": 2}}) - self.assertIn("Retryable", str(error.exception)) + self.assertIn("RetryableError", str(error.exception)) @async_client_context.require_failCommand_appName async def test_retry_overload_error_getMore(self): @@ -140,7 +140,7 @@ async def test_retry_overload_error_getMore(self): "data": { "failCommands": ["getMore"], "errorCode": 462, # IngressRequestRateLimitExceeded - "errorLabels": ["Retryable"], + "errorLabels": ["RetryableError"], }, } cursor = coll.find(batch_size=2) @@ -157,7 +157,7 @@ async def test_retry_overload_error_getMore(self): with self.assertRaises(PyMongoError) as error: await cursor.to_list() - self.assertIn("Retryable", str(error.exception)) + self.assertIn("RetryableError", str(error.exception)) @async_client_context.require_failCommand_appName async def test_limit_retry_command(self): @@ -179,7 +179,7 @@ async def test_limit_retry_command(self): with self.assertRaises(PyMongoError) as error: await db.command("find", "t") - self.assertIn("Retryable", str(error.exception)) + self.assertIn("RetryableError", str(error.exception)) class TestRetryPolicy(AsyncPyMongoTestCase): diff --git a/test/test_backpressure.py b/test/test_backpressure.py index 182ce424a9..fac1d6236d 100644 --- a/test/test_backpressure.py +++ b/test/test_backpressure.py @@ -42,7 +42,7 @@ "data": { "failCommands": ["find", "insert", "update"], "errorCode": 462, # IngressRequestRateLimitExceeded - "errorLabels": ["Retryable"], + "errorLabels": ["RetryableError"], }, } @@ -67,7 +67,7 @@ def test_retry_overload_error_command(self): with self.assertRaises(PyMongoError) as error: self.db.command("find", "t") - self.assertIn("Retryable", str(error.exception)) + self.assertIn("RetryableError", str(error.exception)) @client_context.require_failCommand_appName def test_retry_overload_error_find(self): @@ -86,7 +86,7 @@ def test_retry_overload_error_find(self): with self.assertRaises(PyMongoError) as error: self.db.t.find_one() - self.assertIn("Retryable", str(error.exception)) + self.assertIn("RetryableError", str(error.exception)) @client_context.require_failCommand_appName def test_retry_overload_error_insert_one(self): @@ -105,12 +105,12 @@ def test_retry_overload_error_insert_one(self): with self.assertRaises(PyMongoError) as error: self.db.t.find_one() - self.assertIn("Retryable", str(error.exception)) + self.assertIn("RetryableError", str(error.exception)) @client_context.require_failCommand_appName def test_retry_overload_error_update_many(self): # Even though update_many is not a retryable write operation, it will - # still be retried via the "Retryable" error label. + # still be retried via the "RetryableError" error label. self.db.t.insert_one({"x": 1}) # Ensure command is retried on overload error. @@ -126,7 +126,7 @@ def test_retry_overload_error_update_many(self): with self.assertRaises(PyMongoError) as error: self.db.t.update_many({}, {"$set": {"x": 2}}) - self.assertIn("Retryable", str(error.exception)) + self.assertIn("RetryableError", str(error.exception)) @client_context.require_failCommand_appName def test_retry_overload_error_getMore(self): @@ -140,7 +140,7 @@ def test_retry_overload_error_getMore(self): "data": { "failCommands": ["getMore"], "errorCode": 462, # IngressRequestRateLimitExceeded - "errorLabels": ["Retryable"], + "errorLabels": ["RetryableError"], }, } cursor = coll.find(batch_size=2) @@ -157,7 +157,7 @@ def test_retry_overload_error_getMore(self): with self.assertRaises(PyMongoError) as error: cursor.to_list() - self.assertIn("Retryable", str(error.exception)) + self.assertIn("RetryableError", str(error.exception)) @client_context.require_failCommand_appName def test_limit_retry_command(self): @@ -179,7 +179,7 @@ def test_limit_retry_command(self): with self.assertRaises(PyMongoError) as error: db.command("find", "t") - self.assertIn("Retryable", str(error.exception)) + self.assertIn("RetryableError", str(error.exception)) class TestRetryPolicy(PyMongoTestCase): From d267eb48332adece16bc931757282a27ee10c0e7 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Thu, 9 Oct 2025 11:43:03 -0500 Subject: [PATCH 5/8] PYTHON-5536 Avoid clearing the connection pool when the server connection rate limiter triggers (#2509) Co-authored-by: Iris <58442094+sleepyStick@users.noreply.github.com> Co-authored-by: Noah Stapp Co-authored-by: Shane Harvey Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .evergreen/run-tests.sh | 5 +- justfile | 37 +++++----- pymongo/asynchronous/pool.py | 71 ++++++++++++++---- pymongo/asynchronous/topology.py | 4 +- pymongo/logger.py | 2 + pymongo/monitoring.py | 3 + pymongo/network_layer.py | 11 ++- pymongo/pool_shared.py | 11 ++- pymongo/synchronous/pool.py | 71 ++++++++++++++---- pymongo/synchronous/topology.py | 4 +- test/asynchronous/test_encryption.py | 2 +- test/asynchronous/test_pooling.py | 72 +++++++++++++++++++ .../connection-logging.json | 8 ++- .../pool-create-min-size-error.json | 10 +-- .../unified/auth-network-error.json | 6 +- test/load_balancer/sdam-error-handling.json | 15 ++-- test/test_encryption.py | 2 +- test/test_pooling.py | 72 +++++++++++++++++++ tools/synchro.py | 2 +- 19 files changed, 345 insertions(+), 63 deletions(-) diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 2b7d856d41..a9f2ba2b5c 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -26,12 +26,9 @@ else fi # List the packages. -uv sync ${UV_ARGS} --reinstall +uv sync ${UV_ARGS} --reinstall --quiet uv pip list -# Ensure we go back to base environment after the test. -trap "uv sync" EXIT HUP - # Start the test runner. uv run ${UV_ARGS} .evergreen/scripts/run_tests.py "$@" diff --git a/justfile b/justfile index 74ebb48823..7ac5bd33ff 100644 --- a/justfile +++ b/justfile @@ -1,10 +1,11 @@ # See https://just.systems/man/en/ for instructions set shell := ["bash", "-c"] +# Do not modify the lock file when running justfile commands. +export UV_FROZEN := "1" # Commonly used command segments. -uv_run := "uv run --isolated --frozen " -typing_run := uv_run + "--group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd" -docs_run := uv_run + "--extra docs" +typing_run := "uv run --group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd" +docs_run := "uv run --extra docs" doc_build := "./doc/_build" mypy_args := "--install-types --non-interactive" @@ -13,51 +14,55 @@ mypy_args := "--install-types --non-interactive" default: @just --list +[private] +resync: + @uv sync --quiet --frozen + install: bash .evergreen/scripts/setup-dev-env.sh [group('docs')] -docs: +docs: && resync {{docs_run}} sphinx-build -W -b html doc {{doc_build}}/html [group('docs')] -docs-serve: +docs-serve: && resync {{docs_run}} sphinx-autobuild -W -b html doc --watch ./pymongo --watch ./bson --watch ./gridfs {{doc_build}}/serve [group('docs')] -docs-linkcheck: +docs-linkcheck: && resync {{docs_run}} sphinx-build -E -b linkcheck doc {{doc_build}}/linkcheck [group('typing')] -typing: +typing: && resync just typing-mypy just typing-pyright [group('typing')] -typing-mypy: +typing-mypy: && resync {{typing_run}} mypy {{mypy_args}} bson gridfs tools pymongo {{typing_run}} mypy {{mypy_args}} --config-file mypy_test.ini test {{typing_run}} mypy {{mypy_args}} test/test_typing.py test/test_typing_strict.py [group('typing')] -typing-pyright: +typing-pyright: && resync {{typing_run}} pyright test/test_typing.py test/test_typing_strict.py {{typing_run}} pyright -p strict_pyrightconfig.json test/test_typing_strict.py [group('lint')] -lint: - {{uv_run}} pre-commit run --all-files +lint: && resync + uv run pre-commit run --all-files [group('lint')] -lint-manual: - {{uv_run}} pre-commit run --all-files --hook-stage manual +lint-manual: && resync + uv run pre-commit run --all-files --hook-stage manual [group('test')] -test *args="-v --durations=5 --maxfail=10": - {{uv_run}} --extra test pytest {{args}} +test *args="-v --durations=5 --maxfail=10": && resync + uv run --extra test pytest {{args}} [group('test')] -run-tests *args: +run-tests *args: && resync bash ./.evergreen/run-tests.sh {{args}} [group('test')] diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 8c169b4c52..065686f43a 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -37,7 +37,7 @@ from bson import DEFAULT_CODEC_OPTIONS from pymongo import _csot, helpers_shared from pymongo.asynchronous.client_session import _validate_session_write_concern -from pymongo.asynchronous.helpers import _handle_reauth +from pymongo.asynchronous.helpers import _backoff, _handle_reauth from pymongo.asynchronous.network import command from pymongo.common import ( MAX_BSON_SIZE, @@ -788,9 +788,9 @@ def __init__( # Enforces: maxConnecting # Also used for: clearing the wait queue self._max_connecting_cond = _async_create_condition(self.lock) - self._max_connecting = self.opts.max_connecting self._pending = 0 self._client_id = client_id + self._backoff = 0 if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_created( @@ -846,6 +846,8 @@ async def _reset( async with self.size_cond: if self.closed: return + # Clear the backoff state. + self._backoff = 0 if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED self.gen.inc(service_id) @@ -928,6 +930,11 @@ async def _reset( for conn in sockets: await conn.close_conn(ConnectionClosedReason.STALE) + @property + def max_connecting(self) -> int: + """The current max connecting limit for the pool.""" + return 1 if self._backoff else self.opts.max_connecting + async def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the Pool. @@ -994,7 +1001,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: async with self._max_connecting_cond: # If maxConnecting connections are already being created # by this pool then try again later instead of waiting. - if self._pending >= self._max_connecting: + if self._pending >= self.max_connecting: return self._pending += 1 incremented = True @@ -1022,6 +1029,30 @@ async def remove_stale_sockets(self, reference_generation: int) -> None: self.requests -= 1 self.size_cond.notify() + def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None: + # Handle system overload condition for non-sdam pools. + # Look for an AutoReconnect error raised from a ConnectionResetError with + # errno == errno.ECONNRESET or raised from an OSError that we've created due to + # a closed connection. + # If found, set backoff and add error labels. + if self.is_sdam or type(error) != AutoReconnect: + return + self._backoff += 1 + error._add_error_label("SystemOverloadedError") + error._add_error_label("RetryableError") + # Log the pool backoff message. + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + message=_ConnectionStatusMessage.POOL_BACKOFF, + clientId=self._client_id, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF), + error=ConnectionClosedReason.POOL_BACKOFF, + ) + async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection: """Connect to Mongo and return a new AsyncConnection. @@ -1051,8 +1082,17 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A driverConnectionId=conn_id, ) + # Apply backoff if applicable. + if self._backoff: + await asyncio.sleep(_backoff(self._backoff)) + + # Pass a context to determine if we successfully create a configured socket. + context = dict(has_created_socket=False) + try: - networking_interface = await _configured_protocol_interface(self.address, self.opts) + networking_interface = await _configured_protocol_interface( + self.address, self.opts, context=context + ) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: async with self.lock: @@ -1073,10 +1113,11 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) + if context["has_created_socket"]: + self._handle_connection_error(error, "handshake", conn_id) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) - raise conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] @@ -1094,15 +1135,18 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A await conn.authenticate() # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException: + except BaseException as e: async with self.lock: self.active_contexts.discard(conn.cancel_context) + self._handle_connection_error(e, "hello", conn_id) await conn.close_conn(ConnectionClosedReason.ERROR) raise if handler: await handler.client._topology.receive_cluster_time(conn._cluster_time) + # Clear the backoff state. + self._backoff = 0 return conn @contextlib.asynccontextmanager @@ -1279,12 +1323,12 @@ async def _get_conn( # to be checked back into the pool. async with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) - while not (self.conns or self._pending < self._max_connecting): + while not (self.conns or self._pending < self.max_connecting): timeout = deadline - time.monotonic() if deadline else None if not await _async_cond_wait(self._max_connecting_cond, timeout): # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. - if self.conns or self._pending < self._max_connecting: + if self.conns or self._pending < self.max_connecting: self._max_connecting_cond.notify() emitted_event = True self._raise_wait_queue_timeout(checkout_started_time) @@ -1425,8 +1469,8 @@ async def _perished(self, conn: AsyncConnection) -> bool: :class:`~pymongo.errors.AutoReconnect` exceptions on server hiccups, etc. We only check if the socket was closed by an external error if it has been > 1 second since the socket was checked into the - pool, to keep performance reasonable - we can't avoid AutoReconnects - completely anyway. + pool, or we are in backoff mode, to keep performance reasonable - + we can't avoid AutoReconnects completely anyway. """ idle_time_seconds = conn.idle_time_seconds() # If socket is idle, open a new one. @@ -1437,8 +1481,11 @@ async def _perished(self, conn: AsyncConnection) -> bool: await conn.close_conn(ConnectionClosedReason.IDLE) return True - if self._check_interval_seconds is not None and ( - self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds + check_interval_seconds = self._check_interval_seconds + if self._backoff: + check_interval_seconds = 0 + if check_interval_seconds is not None and ( + check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds ): if conn.conn_closed(): await conn.close_conn(ConnectionClosedReason.ERROR) diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 283aabc690..1e91bbe79b 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -890,7 +890,9 @@ async def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None # Clear the pool. await server.reset(service_id) elif isinstance(error, ConnectionFailure): - if isinstance(error, WaitQueueTimeoutError): + if isinstance(error, WaitQueueTimeoutError) or error.has_error_label( + "SystemOverloadedError" + ): return # "Client MUST replace the server's description with type Unknown # ... MUST NOT request an immediate check of the server." diff --git a/pymongo/logger.py b/pymongo/logger.py index 1b3fe43b86..ccfc45ed88 100644 --- a/pymongo/logger.py +++ b/pymongo/logger.py @@ -42,6 +42,7 @@ class _ConnectionStatusMessage(str, enum.Enum): POOL_READY = "Connection pool ready" POOL_CLOSED = "Connection pool closed" POOL_CLEARED = "Connection pool cleared" + POOL_BACKOFF = "Connection pool backoff" CONN_CREATED = "Connection created" CONN_READY = "Connection ready" @@ -88,6 +89,7 @@ class _SDAMStatusMessage(str, enum.Enum): _VERBOSE_CONNECTION_ERROR_REASONS = { ConnectionClosedReason.POOL_CLOSED: "Connection pool was closed", ConnectionCheckOutFailedReason.POOL_CLOSED: "Connection pool was closed", + ConnectionClosedReason.POOL_BACKOFF: "Connection pool is in backoff", ConnectionClosedReason.STALE: "Connection pool was stale", ConnectionClosedReason.ERROR: "An error occurred while using the connection", ConnectionCheckOutFailedReason.CONN_ERROR: "An error occurred while trying to establish a new connection", diff --git a/pymongo/monitoring.py b/pymongo/monitoring.py index 46a78aea0b..0dfbbb915a 100644 --- a/pymongo/monitoring.py +++ b/pymongo/monitoring.py @@ -934,6 +934,9 @@ class ConnectionClosedReason: POOL_CLOSED = "poolClosed" """The pool was closed, making the connection no longer valid.""" + POOL_BACKOFF = "poolBackoff" + """The pool is in backoff mode.""" + class ConnectionCheckOutFailedReason: """An enum that defines values for `reason` on a diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 605b8dde9b..9bf46cbc3d 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -256,6 +256,7 @@ def __init__(self, timeout: Optional[float] = None): self._timeout = timeout self._closed = asyncio.get_running_loop().create_future() self._connection_lost = False + self._closing_exception = None def settimeout(self, timeout: float | None) -> None: self._timeout = timeout @@ -269,9 +270,11 @@ def close(self, exc: Optional[Exception] = None) -> None: self.transport.abort() self._resolve_pending(exc) self._connection_lost = True + self._closing_exception = exc # type:ignore[assignment] def connection_lost(self, exc: Optional[Exception] = None) -> None: self._resolve_pending(exc) + self._closing_exception = exc # type:ignore[assignment] if not self._closed.done(): self._closed.set_result(None) @@ -335,8 +338,11 @@ async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[ if self._done_messages: message = await self._done_messages.popleft() else: - if self.transport and self.transport.is_closing(): - raise OSError("connection is already closed") + if self._closed.done(): + if self._closing_exception: + raise self._closing_exception + else: + raise OSError("connection closed") read_waiter = asyncio.get_running_loop().create_future() self._pending_messages.append(read_waiter) try: @@ -474,6 +480,7 @@ def _resolve_pending(self, exc: Optional[Exception] = None) -> None: else: msg.set_exception(exc) self._done_messages.append(msg) + self._pending_messages.clear() class PyMongoKMSProtocol(PyMongoBaseProtocol): diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index 0536dc3835..c555b125df 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -250,6 +250,7 @@ async def _configured_protocol_interface( address: _Address, options: PoolOptions, protocol_kls: type[PyMongoBaseProtocol] = PyMongoProtocol, + context: dict[str, bool] | None = None, ) -> AsyncNetworkingInterface: """Given (host, port) and PoolOptions, return a configured AsyncNetworkingInterface. @@ -261,6 +262,10 @@ async def _configured_protocol_interface( ssl_context = options._ssl_context timeout = options.socket_timeout + # Signal that we have created the socket successfully. + if context: + context["has_created_socket"] = True + if ssl_context is None: return AsyncNetworkingInterface( await asyncio.get_running_loop().create_connection( @@ -374,7 +379,7 @@ def _create_connection(address: _Address, options: PoolOptions) -> socket.socket def _configured_socket_interface( - address: _Address, options: PoolOptions, *args: Any + address: _Address, options: PoolOptions, *args: Any, context: dict[str, bool] | None = None ) -> NetworkingInterface: """Given (host, port) and PoolOptions, return a NetworkingInterface wrapping a configured socket. @@ -385,6 +390,10 @@ def _configured_socket_interface( sock = _create_connection(address, options) ssl_context = options._ssl_context + # Signal that we have created the socket successfully. + if context: + context["has_created_socket"] = True + if ssl_context is None: sock.settimeout(options.socket_timeout) return NetworkingInterface(sock) diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index f35ca4d0fd..d0c517f186 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -84,7 +84,7 @@ from pymongo.server_type import SERVER_TYPE from pymongo.socket_checker import SocketChecker from pymongo.synchronous.client_session import _validate_session_write_concern -from pymongo.synchronous.helpers import _handle_reauth +from pymongo.synchronous.helpers import _backoff, _handle_reauth from pymongo.synchronous.network import command if TYPE_CHECKING: @@ -786,9 +786,9 @@ def __init__( # Enforces: maxConnecting # Also used for: clearing the wait queue self._max_connecting_cond = _create_condition(self.lock) - self._max_connecting = self.opts.max_connecting self._pending = 0 self._client_id = client_id + self._backoff = 0 if self.enabled_for_cmap: assert self.opts._event_listeners is not None self.opts._event_listeners.publish_pool_created( @@ -844,6 +844,8 @@ def _reset( with self.size_cond: if self.closed: return + # Clear the backoff state. + self._backoff = 0 if self.opts.pause_enabled and pause and not self.opts.load_balanced: old_state, self.state = self.state, PoolState.PAUSED self.gen.inc(service_id) @@ -926,6 +928,11 @@ def _reset( for conn in sockets: conn.close_conn(ConnectionClosedReason.STALE) + @property + def max_connecting(self) -> int: + """The current max connecting limit for the pool.""" + return 1 if self._backoff else self.opts.max_connecting + def update_is_writable(self, is_writable: Optional[bool]) -> None: """Updates the is_writable attribute on all sockets currently in the Pool. @@ -990,7 +997,7 @@ def remove_stale_sockets(self, reference_generation: int) -> None: with self._max_connecting_cond: # If maxConnecting connections are already being created # by this pool then try again later instead of waiting. - if self._pending >= self._max_connecting: + if self._pending >= self.max_connecting: return self._pending += 1 incremented = True @@ -1018,6 +1025,30 @@ def remove_stale_sockets(self, reference_generation: int) -> None: self.requests -= 1 self.size_cond.notify() + def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None: + # Handle system overload condition for non-sdam pools. + # Look for an AutoReconnect error raised from a ConnectionResetError with + # errno == errno.ECONNRESET or raised from an OSError that we've created due to + # a closed connection. + # If found, set backoff and add error labels. + if self.is_sdam or type(error) != AutoReconnect: + return + self._backoff += 1 + error._add_error_label("SystemOverloadedError") + error._add_error_label("RetryableError") + # Log the pool backoff message. + if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG): + _debug_log( + _CONNECTION_LOGGER, + message=_ConnectionStatusMessage.POOL_BACKOFF, + clientId=self._client_id, + serverHost=self.address[0], + serverPort=self.address[1], + driverConnectionId=conn_id, + reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF), + error=ConnectionClosedReason.POOL_BACKOFF, + ) + def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connection: """Connect to Mongo and return a new Connection. @@ -1047,8 +1078,17 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect driverConnectionId=conn_id, ) + # Apply backoff if applicable. + if self._backoff: + time.sleep(_backoff(self._backoff)) + + # Pass a context to determine if we successfully create a configured socket. + context = dict(has_created_socket=False) + try: - networking_interface = _configured_socket_interface(self.address, self.opts) + networking_interface = _configured_socket_interface( + self.address, self.opts, context=context + ) # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. except BaseException as error: with self.lock: @@ -1069,10 +1109,11 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR), error=ConnectionClosedReason.ERROR, ) + if context["has_created_socket"]: + self._handle_connection_error(error, "handshake", conn_id) if isinstance(error, (IOError, OSError, *SSLErrors)): details = _get_timeout_details(self.opts) _raise_connection_failure(self.address, error, timeout_details=details) - raise conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] @@ -1090,15 +1131,18 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect conn.authenticate() # Catch KeyboardInterrupt, CancelledError, etc. and cleanup. - except BaseException: + except BaseException as e: with self.lock: self.active_contexts.discard(conn.cancel_context) + self._handle_connection_error(e, "hello", conn_id) conn.close_conn(ConnectionClosedReason.ERROR) raise if handler: handler.client._topology.receive_cluster_time(conn._cluster_time) + # Clear the backoff state. + self._backoff = 0 return conn @contextlib.contextmanager @@ -1275,12 +1319,12 @@ def _get_conn( # to be checked back into the pool. with self._max_connecting_cond: self._raise_if_not_ready(checkout_started_time, emit_event=False) - while not (self.conns or self._pending < self._max_connecting): + while not (self.conns or self._pending < self.max_connecting): timeout = deadline - time.monotonic() if deadline else None if not _cond_wait(self._max_connecting_cond, timeout): # Timed out, notify the next thread to ensure a # timeout doesn't consume the condition. - if self.conns or self._pending < self._max_connecting: + if self.conns or self._pending < self.max_connecting: self._max_connecting_cond.notify() emitted_event = True self._raise_wait_queue_timeout(checkout_started_time) @@ -1421,8 +1465,8 @@ def _perished(self, conn: Connection) -> bool: :class:`~pymongo.errors.AutoReconnect` exceptions on server hiccups, etc. We only check if the socket was closed by an external error if it has been > 1 second since the socket was checked into the - pool, to keep performance reasonable - we can't avoid AutoReconnects - completely anyway. + pool, or we are in backoff mode, to keep performance reasonable - + we can't avoid AutoReconnects completely anyway. """ idle_time_seconds = conn.idle_time_seconds() # If socket is idle, open a new one. @@ -1433,8 +1477,11 @@ def _perished(self, conn: Connection) -> bool: conn.close_conn(ConnectionClosedReason.IDLE) return True - if self._check_interval_seconds is not None and ( - self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds + check_interval_seconds = self._check_interval_seconds + if self._backoff: + check_interval_seconds = 0 + if check_interval_seconds is not None and ( + check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds ): if conn.conn_closed(): conn.close_conn(ConnectionClosedReason.ERROR) diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index a4ca0e6e0f..0f6592dfc0 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -888,7 +888,9 @@ def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None: # Clear the pool. server.reset(service_id) elif isinstance(error, ConnectionFailure): - if isinstance(error, WaitQueueTimeoutError): + if isinstance(error, WaitQueueTimeoutError) or error.has_error_label( + "SystemOverloadedError" + ): return # "Client MUST replace the server's description with type Unknown # ... MUST NOT request an immediate check of the server." diff --git a/test/asynchronous/test_encryption.py b/test/asynchronous/test_encryption.py index f6afa4b2a3..adba824143 100644 --- a/test/asynchronous/test_encryption.py +++ b/test/asynchronous/test_encryption.py @@ -1276,7 +1276,7 @@ async def test_06_insert_fails_over_16MiB(self): with self.assertRaises(BulkWriteError) as ctx: await self.coll_encrypted.bulk_write([InsertOne(doc)]) err = ctx.exception.details["writeErrors"][0] - self.assertEqual(2, err["code"]) + self.assertIn(err["code"], [2, 10334]) self.assertIn("object to insert too large", err["errmsg"]) diff --git a/test/asynchronous/test_pooling.py b/test/asynchronous/test_pooling.py index 3193d9e3d5..6cbdf7a65c 100644 --- a/test/asynchronous/test_pooling.py +++ b/test/asynchronous/test_pooling.py @@ -29,6 +29,7 @@ from pymongo.errors import AutoReconnect, ConnectionFailure, DuplicateKeyError from pymongo.hello import HelloCompat from pymongo.lock import _async_create_lock +from pymongo.read_preferences import ReadPreference sys.path[0:0] = [""] @@ -513,6 +514,77 @@ async def test_connection_timeout_message(self): str(error.exception), ) + async def test_pool_check_backoff(self): + # Test that Pool recovers from two connection failures in a row. + # This exercises code at the end of Pool._check(). + cx_pool = await self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1) + self.addAsyncCleanup(cx_pool.close) + + async with cx_pool.checkout() as conn: + # Simulate a closed socket without telling the Connection it's + # closed. + await conn.conn.close() + + # Enable backoff. + cx_pool._backoff = 1 + + # Swap pool's address with a bad one. + address, cx_pool.address = cx_pool.address, ("foo.com", 1234) + with self.assertRaises(AutoReconnect): + async with cx_pool.checkout(): + pass + + # Back to normal, semaphore was correctly released. + cx_pool.address = address + async with cx_pool.checkout(): + pass + + @async_client_context.require_failCommand_appName + async def test_pool_backoff_preserves_existing_connections(self): + client = await self.async_rs_or_single_client() + coll = self.db.t + pool = await async_get_pool(client) + await coll.insert_many([{"x": 1} for _ in range(10)]) + t = SocketGetter(self.c, pool) + await t.start() + while t.state != "connection": + await asyncio.sleep(0.1) + + assert not t.sock.conn_closed() + + # Mock a session establishment overload. + mock_connection_fail = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "closeConnection": True, + }, + } + + async with self.fail_point(mock_connection_fail): + await coll.find_one({}) + + # Make sure the pool is out of backoff state. + assert pool._backoff == 0 + + # Make sure the existing socket was not affected. + assert not t.sock.conn_closed() + + # Cleanup + await t.release_conn() + await t.join() + await pool.close() + + async def test_pool_backoff_limits_maxConnecting(self): + client = await self.async_rs_or_single_client(maxConnecting=10) + pool = await async_get_pool(client) + assert pool.max_connecting == 10 + pool._backoff = 1 + assert pool.max_connecting == 1 + pool._backoff = 0 + assert pool.max_connecting == 10 + await client.close() + class TestPoolMaxSize(_TestPoolingBase): async def test_max_pool_size(self): diff --git a/test/connection_logging/connection-logging.json b/test/connection_logging/connection-logging.json index 5799e834d7..60190c7dc0 100644 --- a/test/connection_logging/connection-logging.json +++ b/test/connection_logging/connection-logging.json @@ -331,7 +331,9 @@ "uriOptions": { "retryReads": false, "appname": "clientAppName", - "heartbeatFrequencyMS": 10000 + "heartbeatFrequencyMS": 10000, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500 }, "observeLogMessages": { "connection": "debug" @@ -355,7 +357,9 @@ "failCommands": [ "saslContinue" ], - "closeConnection": true, + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "clientAppName" } } diff --git a/test/connection_monitoring/pool-create-min-size-error.json b/test/connection_monitoring/pool-create-min-size-error.json index 1c744b850c..8ec958780d 100644 --- a/test/connection_monitoring/pool-create-min-size-error.json +++ b/test/connection_monitoring/pool-create-min-size-error.json @@ -9,21 +9,23 @@ ], "failPoint": { "configureFailPoint": "failCommand", - "mode": { - "times": 50 - }, + "mode": "alwaysOn", "data": { "failCommands": [ "isMaster", "hello" ], - "closeConnection": true, + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "poolCreateMinSizeErrorTest" } }, "poolOptions": { "minPoolSize": 1, "backgroundThreadIntervalMS": 50, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500, "appName": "poolCreateMinSizeErrorTest" }, "operations": [ diff --git a/test/discovery_and_monitoring/unified/auth-network-error.json b/test/discovery_and_monitoring/unified/auth-network-error.json index 84763af32e..656b291366 100644 --- a/test/discovery_and_monitoring/unified/auth-network-error.json +++ b/test/discovery_and_monitoring/unified/auth-network-error.json @@ -53,7 +53,9 @@ "failCommands": [ "saslContinue" ], - "closeConnection": true, + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "authNetworkErrorTest" } } @@ -75,6 +77,8 @@ ], "uriOptions": { "retryWrites": false, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500, "appname": "authNetworkErrorTest" } } diff --git a/test/load_balancer/sdam-error-handling.json b/test/load_balancer/sdam-error-handling.json index 5892dcacd6..b9842b8017 100644 --- a/test/load_balancer/sdam-error-handling.json +++ b/test/load_balancer/sdam-error-handling.json @@ -32,6 +32,8 @@ "useMultipleMongoses": false, "uriOptions": { "appname": "lbSDAMErrorTestClient", + "socketTimeoutMS": 500, + "connectTimeoutMS": 500, "retryWrites": false }, "observeEvents": [ @@ -64,7 +66,9 @@ "id": "multiClient", "useMultipleMongoses": true, "uriOptions": { - "retryWrites": false + "retryWrites": false, + "socketTimeoutMS": 500, + "connectTimeoutMS": 500 }, "observeEvents": [ "connectionCreatedEvent", @@ -282,7 +286,8 @@ "isMaster", "hello" ], - "closeConnection": true, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "lbSDAMErrorTestClient" } } @@ -345,7 +350,8 @@ "failCommands": [ "saslContinue" ], - "closeConnection": true, + "blockConnection": true, + "blockTimeMS": 1000, "appName": "lbSDAMErrorTestClient" } } @@ -406,7 +412,8 @@ "failCommands": [ "getMore" ], - "closeConnection": true + "closeConnection": true, + "appName": "lbSDAMErrorTestClient" } } } diff --git a/test/test_encryption.py b/test/test_encryption.py index 5c8813203d..1a307f56ee 100644 --- a/test/test_encryption.py +++ b/test/test_encryption.py @@ -1272,7 +1272,7 @@ def test_06_insert_fails_over_16MiB(self): with self.assertRaises(BulkWriteError) as ctx: self.coll_encrypted.bulk_write([InsertOne(doc)]) err = ctx.exception.details["writeErrors"][0] - self.assertEqual(2, err["code"]) + self.assertIn(err["code"], [2, 10334]) self.assertIn("object to insert too large", err["errmsg"]) diff --git a/test/test_pooling.py b/test/test_pooling.py index cb5b206996..f3bfcf4ba2 100644 --- a/test/test_pooling.py +++ b/test/test_pooling.py @@ -29,6 +29,7 @@ from pymongo.errors import AutoReconnect, ConnectionFailure, DuplicateKeyError from pymongo.hello import HelloCompat from pymongo.lock import _create_lock +from pymongo.read_preferences import ReadPreference sys.path[0:0] = [""] @@ -511,6 +512,77 @@ def test_connection_timeout_message(self): str(error.exception), ) + def test_pool_check_backoff(self): + # Test that Pool recovers from two connection failures in a row. + # This exercises code at the end of Pool._check(). + cx_pool = self.create_pool(max_pool_size=1, connect_timeout=1, wait_queue_timeout=1) + self.addCleanup(cx_pool.close) + + with cx_pool.checkout() as conn: + # Simulate a closed socket without telling the Connection it's + # closed. + conn.conn.close() + + # Enable backoff. + cx_pool._backoff = 1 + + # Swap pool's address with a bad one. + address, cx_pool.address = cx_pool.address, ("foo.com", 1234) + with self.assertRaises(AutoReconnect): + with cx_pool.checkout(): + pass + + # Back to normal, semaphore was correctly released. + cx_pool.address = address + with cx_pool.checkout(): + pass + + @client_context.require_failCommand_appName + def test_pool_backoff_preserves_existing_connections(self): + client = self.rs_or_single_client() + coll = self.db.t + pool = get_pool(client) + coll.insert_many([{"x": 1} for _ in range(10)]) + t = SocketGetter(self.c, pool) + t.start() + while t.state != "connection": + time.sleep(0.1) + + assert not t.sock.conn_closed() + + # Mock a session establishment overload. + mock_connection_fail = { + "configureFailPoint": "failCommand", + "mode": {"times": 1}, + "data": { + "closeConnection": True, + }, + } + + with self.fail_point(mock_connection_fail): + coll.find_one({}) + + # Make sure the pool is out of backoff state. + assert pool._backoff == 0 + + # Make sure the existing socket was not affected. + assert not t.sock.conn_closed() + + # Cleanup + t.release_conn() + t.join() + pool.close() + + def test_pool_backoff_limits_maxConnecting(self): + client = self.rs_or_single_client(maxConnecting=10) + pool = get_pool(client) + assert pool.max_connecting == 10 + pool._backoff = 1 + assert pool.max_connecting == 1 + pool._backoff = 0 + assert pool.max_connecting == 10 + client.close() + class TestPoolMaxSize(_TestPoolingBase): def test_max_pool_size(self): diff --git a/tools/synchro.py b/tools/synchro.py index 44698134cd..968d0e362f 100644 --- a/tools/synchro.py +++ b/tools/synchro.py @@ -341,7 +341,7 @@ def translate_async_sleeps(lines: list[str]) -> list[str]: sleeps = [line for line in lines if "asyncio.sleep" in line] for line in sleeps: - res = re.search(r"asyncio.sleep\(([^()]*)\)", line) + res = re.search(r"asyncio\.sleep\(\s*(.*?)\)", line) if res: old = res[0] index = lines.index(line) From 27785aede625d48bd3c7243d3cab2704ab04a3ac Mon Sep 17 00:00:00 2001 From: Shane Harvey Date: Thu, 23 Oct 2025 10:54:25 -0700 Subject: [PATCH 6/8] PYTHON-5629 Increase max overload retries from 3 to 5 and initial delay from 50ms to 100ms (#2599) --- pymongo/asynchronous/helpers.py | 4 ++-- pymongo/synchronous/helpers.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pymongo/asynchronous/helpers.py b/pymongo/asynchronous/helpers.py index 96241b947c..b29d02ea2c 100644 --- a/pymongo/asynchronous/helpers.py +++ b/pymongo/asynchronous/helpers.py @@ -77,8 +77,8 @@ async def inner(*args: Any, **kwargs: Any) -> Any: return cast(F, inner) -_MAX_RETRIES = 3 -_BACKOFF_INITIAL = 0.05 +_MAX_RETRIES = 5 +_BACKOFF_INITIAL = 0.1 _BACKOFF_MAX = 10 # DRIVERS-3240 will determine these defaults. DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0 diff --git a/pymongo/synchronous/helpers.py b/pymongo/synchronous/helpers.py index 72d8978796..2def3a8c14 100644 --- a/pymongo/synchronous/helpers.py +++ b/pymongo/synchronous/helpers.py @@ -77,8 +77,8 @@ def inner(*args: Any, **kwargs: Any) -> Any: return cast(F, inner) -_MAX_RETRIES = 3 -_BACKOFF_INITIAL = 0.05 +_MAX_RETRIES = 5 +_BACKOFF_INITIAL = 0.1 _BACKOFF_MAX = 10 # DRIVERS-3240 will determine these defaults. DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0 From cf2d75601a006c33a84e970d44ba3ffd06a3dfb3 Mon Sep 17 00:00:00 2001 From: Jib Date: Mon, 23 Mar 2026 17:00:21 -0400 Subject: [PATCH 7/8] PYTHON-5767 Implement DRIVERS-3427 phase 1 rollout changes - Change MAX_RETRIES default from 5 to 2 - Add configurable maxAdaptiveRetries client option (default: 2) - Add enableOverloadRetargeting client option (default: false) - Add retry metadata ("retry": N) to outgoing command bodies on retry - Remove _retry_overload decorator from collection/database methods Co-Authored-By: Claude Code --- pymongo/asynchronous/collection.py | 4 ---- pymongo/asynchronous/database.py | 5 ----- pymongo/asynchronous/helpers.py | 7 ++++++- pymongo/asynchronous/mongo_client.py | 10 ++++++++-- pymongo/asynchronous/pool.py | 5 ++++- pymongo/client_options.py | 26 ++++++++++++++++++++++++++ pymongo/common.py | 8 ++++++++ pymongo/synchronous/collection.py | 4 ---- pymongo/synchronous/database.py | 5 ----- pymongo/synchronous/helpers.py | 7 ++++++- pymongo/synchronous/mongo_client.py | 10 ++++++++-- pymongo/synchronous/pool.py | 5 ++++- 12 files changed, 70 insertions(+), 26 deletions(-) diff --git a/pymongo/asynchronous/collection.py b/pymongo/asynchronous/collection.py index 82a6872b1b..127136dd48 100644 --- a/pymongo/asynchronous/collection.py +++ b/pymongo/asynchronous/collection.py @@ -57,7 +57,6 @@ AsyncCursor, AsyncRawBatchCursor, ) -from pymongo.asynchronous.helpers import _retry_overload from pymongo.collation import validate_collation_or_none from pymongo.common import _ecoc_coll_name, _esc_coll_name from pymongo.errors import ( @@ -2228,7 +2227,6 @@ async def create_indexes( return await self._create_indexes(indexes, session, **kwargs) @_csot.apply - @_retry_overload async def _create_indexes( self, indexes: Sequence[IndexModel], session: Optional[AsyncClientSession], **kwargs: Any ) -> list[str]: @@ -2480,7 +2478,6 @@ async def drop_index( await self._drop_index(index_or_name, session, comment, **kwargs) @_csot.apply - @_retry_overload async def _drop_index( self, index_or_name: _IndexKeyHint, @@ -3104,7 +3101,6 @@ async def aggregate_raw_batches( ) @_csot.apply - @_retry_overload async def rename( self, new_name: str, diff --git a/pymongo/asynchronous/database.py b/pymongo/asynchronous/database.py index fa927ce61e..2c482f0415 100644 --- a/pymongo/asynchronous/database.py +++ b/pymongo/asynchronous/database.py @@ -38,7 +38,6 @@ from pymongo.asynchronous.change_stream import AsyncDatabaseChangeStream from pymongo.asynchronous.collection import AsyncCollection from pymongo.asynchronous.command_cursor import AsyncCommandCursor -from pymongo.asynchronous.helpers import _retry_overload from pymongo.common import _ecoc_coll_name, _esc_coll_name from pymongo.database_shared import _check_name, _CodecDocumentType from pymongo.errors import CollectionInvalid, InvalidOperation @@ -479,7 +478,6 @@ async def watch( return change_stream @_csot.apply - @_retry_overload async def create_collection( self, name: str, @@ -822,7 +820,6 @@ async def command( ... @_csot.apply - @_retry_overload async def command( self, command: Union[str, MutableMapping[str, Any]], @@ -959,7 +956,6 @@ async def inner( ) @_csot.apply - @_retry_overload async def cursor_command( self, command: Union[str, MutableMapping[str, Any]], @@ -1283,7 +1279,6 @@ async def inner( return await self.client._retryable_write(False, inner, session, _Op.DROP) @_csot.apply - @_retry_overload async def drop_collection( self, name_or_collection: Union[str, AsyncCollection[_DocumentTypeArg]], diff --git a/pymongo/asynchronous/helpers.py b/pymongo/asynchronous/helpers.py index 2c01c19b7a..74a19328bb 100644 --- a/pymongo/asynchronous/helpers.py +++ b/pymongo/asynchronous/helpers.py @@ -22,6 +22,7 @@ import socket import sys import time as time # noqa: PLC0414 # needed in sync version +from contextvars import ContextVar from typing import ( Any, Callable, @@ -76,9 +77,13 @@ async def inner(*args: Any, **kwargs: Any) -> Any: return cast(F, inner) -_MAX_RETRIES = 5 +_MAX_RETRIES = 2 _BACKOFF_INITIAL = 0.1 _BACKOFF_MAX = 10 + +# Context variable used to pass the current retry attempt number to conn.command() +# so that retry metadata can be injected into outgoing command bodies. +_RETRY_ATTEMPT: ContextVar[int] = ContextVar("_retry_attempt", default=0) DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0 DEFAULT_RETRY_TOKEN_RETURN = 0.1 diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 8936068af6..b4bd8d00c4 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -69,6 +69,7 @@ from pymongo.asynchronous.client_session import _SESSION, _EmptyServerSession from pymongo.asynchronous.command_cursor import AsyncCommandCursor from pymongo.asynchronous.helpers import ( + _RETRY_ATTEMPT, _RetryPolicy, _TokenBucket, ) @@ -895,7 +896,9 @@ def __init__( ) self._retry_policy = _RetryPolicy( - _TokenBucket(), adaptive_retry=self._options.adaptive_retries + _TokenBucket(), + attempts=self._options.max_adaptive_retries, + adaptive_retry=self._options.adaptive_retries, ) self._init_based_on_options(self._seeds, srv_max_hosts, srv_service_name) @@ -2820,6 +2823,7 @@ async def run(self) -> T: while True: self._check_last_error(check_csot=True) + retry_token = _RETRY_ATTEMPT.set(self._attempt_number) try: res = await self._read() if self._is_read else await self._write() await self._retry_policy.record_success(self._attempt_number > 0) @@ -2930,7 +2934,7 @@ async def run(self) -> T: transaction.set_starting() transaction.attempt = 0 - if ( + if self._client.options.enable_overload_retargeting and ( self._server is not None and self._client.topology_description.topology_type_name == "Sharded" or exc.has_error_label("SystemOverloadedError") @@ -2946,6 +2950,8 @@ async def run(self) -> T: else: raise await asyncio.sleep(delay) + finally: + _RETRY_ATTEMPT.reset(retry_token) def _is_not_eligible_for_retry(self) -> bool: """Checks if the exchange is not eligible for retry""" diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 3c1a85246e..e95db69117 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -39,7 +39,7 @@ from bson import DEFAULT_CODEC_OPTIONS from pymongo import _csot, helpers_shared from pymongo.asynchronous.client_session import _validate_session_write_concern -from pymongo.asynchronous.helpers import _handle_reauth +from pymongo.asynchronous.helpers import _RETRY_ATTEMPT, _handle_reauth from pymongo.asynchronous.network import command from pymongo.common import ( MAX_BSON_SIZE, @@ -395,6 +395,9 @@ async def command( if session: session._apply_to(spec, retryable_write, read_preference, self) self.send_cluster_time(spec, session, client) + retry_attempt = _RETRY_ATTEMPT.get() + if retry_attempt > 0: + spec["retry"] = retry_attempt listeners = self.listeners if publish_events else None unacknowledged = bool(write_concern and not write_concern.acknowledged) if self.op_msg_enabled: diff --git a/pymongo/client_options.py b/pymongo/client_options.py index 1e488c2b8f..58810ee702 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -240,6 +240,16 @@ def __init__( if "adaptive_retries" in options else options.get("adaptiveretries", common.ADAPTIVE_RETRIES) ) + self.__max_adaptive_retries = ( + options.get("max_adaptive_retries", common.MAX_ADAPTIVE_RETRIES) + if "max_adaptive_retries" in options + else options.get("maxadaptiveretries", common.MAX_ADAPTIVE_RETRIES) + ) + self.__enable_overload_retargeting = ( + options.get("enable_overload_retargeting", common.ENABLE_OVERLOAD_RETARGETING) + if "enable_overload_retargeting" in options + else options.get("enableoverloadretargeting", common.ENABLE_OVERLOAD_RETARGETING) + ) @property def _options(self) -> Mapping[str, Any]: @@ -359,3 +369,19 @@ def adaptive_retries(self) -> bool: .. versionadded:: 4.XX """ return self.__adaptive_retries + + @property + def max_adaptive_retries(self) -> int: + """The configured maxAdaptiveRetries option. + + .. versionadded:: 4.XX + """ + return self.__max_adaptive_retries + + @property + def enable_overload_retargeting(self) -> bool: + """The configured enableOverloadRetargeting option. + + .. versionadded:: 4.XX + """ + return self.__enable_overload_retargeting diff --git a/pymongo/common.py b/pymongo/common.py index 8b9797682f..a7953a0507 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -143,6 +143,12 @@ # Default value for adaptiveRetries ADAPTIVE_RETRIES = False +# Default value for maxAdaptiveRetries +MAX_ADAPTIVE_RETRIES = 2 + +# Default value for enableOverloadRetargeting +ENABLE_OVERLOAD_RETARGETING = False + # Auth mechanism properties that must raise an error instead of warning if they invalidate. _MECH_PROP_MUST_RAISE = ["CANONICALIZE_HOST_NAME"] @@ -776,6 +782,8 @@ def validate_server_monitoring_mode(option: str, value: str) -> str: "auto_encryption_opts": validate_auto_encryption_opts_or_none, "authoidcallowedhosts": validate_list, "adaptive_retries": validate_boolean_or_string, + "max_adaptive_retries": validate_non_negative_integer, + "enable_overload_retargeting": validate_boolean_or_string, } # Dictionary where keys are any URI option name, and values are the diff --git a/pymongo/synchronous/collection.py b/pymongo/synchronous/collection.py index a37d7bcfd9..34fd7190d1 100644 --- a/pymongo/synchronous/collection.py +++ b/pymongo/synchronous/collection.py @@ -88,7 +88,6 @@ Cursor, RawBatchCursor, ) -from pymongo.synchronous.helpers import _retry_overload from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline from pymongo.write_concern import DEFAULT_WRITE_CONCERN, WriteConcern, validate_boolean @@ -2225,7 +2224,6 @@ def create_indexes( return self._create_indexes(indexes, session, **kwargs) @_csot.apply - @_retry_overload def _create_indexes( self, indexes: Sequence[IndexModel], session: Optional[ClientSession], **kwargs: Any ) -> list[str]: @@ -2475,7 +2473,6 @@ def drop_index( self._drop_index(index_or_name, session, comment, **kwargs) @_csot.apply - @_retry_overload def _drop_index( self, index_or_name: _IndexKeyHint, @@ -3097,7 +3094,6 @@ def aggregate_raw_batches( ) @_csot.apply - @_retry_overload def rename( self, new_name: str, diff --git a/pymongo/synchronous/database.py b/pymongo/synchronous/database.py index 9d2815ad54..cc041a2e30 100644 --- a/pymongo/synchronous/database.py +++ b/pymongo/synchronous/database.py @@ -43,7 +43,6 @@ from pymongo.synchronous.change_stream import DatabaseChangeStream from pymongo.synchronous.collection import Collection from pymongo.synchronous.command_cursor import CommandCursor -from pymongo.synchronous.helpers import _retry_overload from pymongo.typings import _CollationIn, _DocumentType, _DocumentTypeArg, _Pipeline if TYPE_CHECKING: @@ -479,7 +478,6 @@ def watch( return change_stream @_csot.apply - @_retry_overload def create_collection( self, name: str, @@ -822,7 +820,6 @@ def command( ... @_csot.apply - @_retry_overload def command( self, command: Union[str, MutableMapping[str, Any]], @@ -959,7 +956,6 @@ def inner( ) @_csot.apply - @_retry_overload def cursor_command( self, command: Union[str, MutableMapping[str, Any]], @@ -1280,7 +1276,6 @@ def inner( return self.client._retryable_write(False, inner, session, _Op.DROP) @_csot.apply - @_retry_overload def drop_collection( self, name_or_collection: Union[str, Collection[_DocumentTypeArg]], diff --git a/pymongo/synchronous/helpers.py b/pymongo/synchronous/helpers.py index 1a27fc11a5..bf83ca802c 100644 --- a/pymongo/synchronous/helpers.py +++ b/pymongo/synchronous/helpers.py @@ -22,6 +22,7 @@ import socket import sys import time as time # noqa: PLC0414 # needed in sync version +from contextvars import ContextVar from typing import ( Any, Callable, @@ -76,9 +77,13 @@ def inner(*args: Any, **kwargs: Any) -> Any: return cast(F, inner) -_MAX_RETRIES = 5 +_MAX_RETRIES = 2 _BACKOFF_INITIAL = 0.1 _BACKOFF_MAX = 10 + +# Context variable used to pass the current retry attempt number to conn.command() +# so that retry metadata can be injected into outgoing command bodies. +_RETRY_ATTEMPT: ContextVar[int] = ContextVar("_retry_attempt", default=0) DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0 DEFAULT_RETRY_TOKEN_RETURN = 0.1 diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index ab74650780..33727d641c 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -112,6 +112,7 @@ from pymongo.synchronous.client_session import _SESSION, _EmptyServerSession from pymongo.synchronous.command_cursor import CommandCursor from pymongo.synchronous.helpers import ( + _RETRY_ATTEMPT, _RetryPolicy, _TokenBucket, ) @@ -895,7 +896,9 @@ def __init__( ) self._retry_policy = _RetryPolicy( - _TokenBucket(), adaptive_retry=self._options.adaptive_retries + _TokenBucket(), + attempts=self._options.max_adaptive_retries, + adaptive_retry=self._options.adaptive_retries, ) self._init_based_on_options(self._seeds, srv_max_hosts, srv_service_name) @@ -2810,6 +2813,7 @@ def run(self) -> T: while True: self._check_last_error(check_csot=True) + retry_token = _RETRY_ATTEMPT.set(self._attempt_number) try: res = self._read() if self._is_read else self._write() self._retry_policy.record_success(self._attempt_number > 0) @@ -2920,7 +2924,7 @@ def run(self) -> T: transaction.set_starting() transaction.attempt = 0 - if ( + if self._client.options.enable_overload_retargeting and ( self._server is not None and self._client.topology_description.topology_type_name == "Sharded" or exc.has_error_label("SystemOverloadedError") @@ -2936,6 +2940,8 @@ def run(self) -> T: else: raise time.sleep(delay) + finally: + _RETRY_ATTEMPT.reset(retry_token) def _is_not_eligible_for_retry(self) -> bool: """Checks if the exchange is not eligible for retry""" diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index d33cb59a98..bf93d302a5 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -88,7 +88,7 @@ from pymongo.server_type import SERVER_TYPE from pymongo.socket_checker import SocketChecker from pymongo.synchronous.client_session import _validate_session_write_concern -from pymongo.synchronous.helpers import _handle_reauth +from pymongo.synchronous.helpers import _RETRY_ATTEMPT, _handle_reauth from pymongo.synchronous.network import command if TYPE_CHECKING: @@ -395,6 +395,9 @@ def command( if session: session._apply_to(spec, retryable_write, read_preference, self) self.send_cluster_time(spec, session, client) + retry_attempt = _RETRY_ATTEMPT.get() + if retry_attempt > 0: + spec["retry"] = retry_attempt listeners = self.listeners if publish_events else None unacknowledged = bool(write_concern and not write_concern.acknowledged) if self.op_msg_enabled: From 0f039868a10ec719ca633403bb5ae57b9113bf6e Mon Sep 17 00:00:00 2001 From: Jib Date: Wed, 25 Mar 2026 12:06:38 -0400 Subject: [PATCH 8/8] removed all retryable tracking logic --- pymongo/asynchronous/helpers.py | 9 ++------- pymongo/asynchronous/mongo_client.py | 6 +----- pymongo/asynchronous/pool.py | 5 +---- pymongo/client_options.py | 14 +++++++------- pymongo/common.py | 6 +++--- pymongo/synchronous/helpers.py | 9 ++------- pymongo/synchronous/mongo_client.py | 6 +----- pymongo/synchronous/pool.py | 5 +---- 8 files changed, 18 insertions(+), 42 deletions(-) diff --git a/pymongo/asynchronous/helpers.py b/pymongo/asynchronous/helpers.py index 74a19328bb..5df68e7c7d 100644 --- a/pymongo/asynchronous/helpers.py +++ b/pymongo/asynchronous/helpers.py @@ -22,7 +22,6 @@ import socket import sys import time as time # noqa: PLC0414 # needed in sync version -from contextvars import ContextVar from typing import ( Any, Callable, @@ -30,7 +29,7 @@ cast, ) -from pymongo import _csot +from pymongo import _csot, common from pymongo.errors import ( OperationFailure, ) @@ -77,13 +76,9 @@ async def inner(*args: Any, **kwargs: Any) -> Any: return cast(F, inner) -_MAX_RETRIES = 2 _BACKOFF_INITIAL = 0.1 _BACKOFF_MAX = 10 -# Context variable used to pass the current retry attempt number to conn.command() -# so that retry metadata can be injected into outgoing command bodies. -_RETRY_ATTEMPT: ContextVar[int] = ContextVar("_retry_attempt", default=0) DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0 DEFAULT_RETRY_TOKEN_RETURN = 0.1 @@ -133,7 +128,7 @@ class _RetryPolicy: def __init__( self, token_bucket: _TokenBucket, - attempts: int = _MAX_RETRIES, + attempts: int = common._MAX_RETRIES, backoff_initial: float = _BACKOFF_INITIAL, backoff_max: float = _BACKOFF_MAX, adaptive_retry: bool = False, diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index b4bd8d00c4..8717526861 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -69,7 +69,6 @@ from pymongo.asynchronous.client_session import _SESSION, _EmptyServerSession from pymongo.asynchronous.command_cursor import AsyncCommandCursor from pymongo.asynchronous.helpers import ( - _RETRY_ATTEMPT, _RetryPolicy, _TokenBucket, ) @@ -897,7 +896,7 @@ def __init__( self._retry_policy = _RetryPolicy( _TokenBucket(), - attempts=self._options.max_adaptive_retries, + attempts=self._options.max_retries, adaptive_retry=self._options.adaptive_retries, ) @@ -2823,7 +2822,6 @@ async def run(self) -> T: while True: self._check_last_error(check_csot=True) - retry_token = _RETRY_ATTEMPT.set(self._attempt_number) try: res = await self._read() if self._is_read else await self._write() await self._retry_policy.record_success(self._attempt_number > 0) @@ -2950,8 +2948,6 @@ async def run(self) -> T: else: raise await asyncio.sleep(delay) - finally: - _RETRY_ATTEMPT.reset(retry_token) def _is_not_eligible_for_retry(self) -> bool: """Checks if the exchange is not eligible for retry""" diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index e95db69117..3c1a85246e 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -39,7 +39,7 @@ from bson import DEFAULT_CODEC_OPTIONS from pymongo import _csot, helpers_shared from pymongo.asynchronous.client_session import _validate_session_write_concern -from pymongo.asynchronous.helpers import _RETRY_ATTEMPT, _handle_reauth +from pymongo.asynchronous.helpers import _handle_reauth from pymongo.asynchronous.network import command from pymongo.common import ( MAX_BSON_SIZE, @@ -395,9 +395,6 @@ async def command( if session: session._apply_to(spec, retryable_write, read_preference, self) self.send_cluster_time(spec, session, client) - retry_attempt = _RETRY_ATTEMPT.get() - if retry_attempt > 0: - spec["retry"] = retry_attempt listeners = self.listeners if publish_events else None unacknowledged = bool(write_concern and not write_concern.acknowledged) if self.op_msg_enabled: diff --git a/pymongo/client_options.py b/pymongo/client_options.py index 58810ee702..eccda5f462 100644 --- a/pymongo/client_options.py +++ b/pymongo/client_options.py @@ -240,10 +240,10 @@ def __init__( if "adaptive_retries" in options else options.get("adaptiveretries", common.ADAPTIVE_RETRIES) ) - self.__max_adaptive_retries = ( - options.get("max_adaptive_retries", common.MAX_ADAPTIVE_RETRIES) - if "max_adaptive_retries" in options - else options.get("maxadaptiveretries", common.MAX_ADAPTIVE_RETRIES) + self.__max_retries = ( + options.get("max_retries", common._MAX_RETRIES) + if "max_retries" in options + else options.get("maxretries", common._MAX_RETRIES) ) self.__enable_overload_retargeting = ( options.get("enable_overload_retargeting", common.ENABLE_OVERLOAD_RETARGETING) @@ -371,12 +371,12 @@ def adaptive_retries(self) -> bool: return self.__adaptive_retries @property - def max_adaptive_retries(self) -> int: - """The configured maxAdaptiveRetries option. + def max_retries(self) -> int: + """The configured maxRetries option. .. versionadded:: 4.XX """ - return self.__max_adaptive_retries + return self.__max_retries @property def enable_overload_retargeting(self) -> bool: diff --git a/pymongo/common.py b/pymongo/common.py index a7953a0507..7d28407045 100644 --- a/pymongo/common.py +++ b/pymongo/common.py @@ -143,8 +143,8 @@ # Default value for adaptiveRetries ADAPTIVE_RETRIES = False -# Default value for maxAdaptiveRetries -MAX_ADAPTIVE_RETRIES = 2 +# Default value for max retries +_MAX_RETRIES = 2 # Default value for enableOverloadRetargeting ENABLE_OVERLOAD_RETARGETING = False @@ -782,7 +782,7 @@ def validate_server_monitoring_mode(option: str, value: str) -> str: "auto_encryption_opts": validate_auto_encryption_opts_or_none, "authoidcallowedhosts": validate_list, "adaptive_retries": validate_boolean_or_string, - "max_adaptive_retries": validate_non_negative_integer, + "max_retries": validate_non_negative_integer, "enable_overload_retargeting": validate_boolean_or_string, } diff --git a/pymongo/synchronous/helpers.py b/pymongo/synchronous/helpers.py index bf83ca802c..29b78caf18 100644 --- a/pymongo/synchronous/helpers.py +++ b/pymongo/synchronous/helpers.py @@ -22,7 +22,6 @@ import socket import sys import time as time # noqa: PLC0414 # needed in sync version -from contextvars import ContextVar from typing import ( Any, Callable, @@ -30,7 +29,7 @@ cast, ) -from pymongo import _csot +from pymongo import _csot, common from pymongo.errors import ( OperationFailure, ) @@ -77,13 +76,9 @@ def inner(*args: Any, **kwargs: Any) -> Any: return cast(F, inner) -_MAX_RETRIES = 2 _BACKOFF_INITIAL = 0.1 _BACKOFF_MAX = 10 -# Context variable used to pass the current retry attempt number to conn.command() -# so that retry metadata can be injected into outgoing command bodies. -_RETRY_ATTEMPT: ContextVar[int] = ContextVar("_retry_attempt", default=0) DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0 DEFAULT_RETRY_TOKEN_RETURN = 0.1 @@ -133,7 +128,7 @@ class _RetryPolicy: def __init__( self, token_bucket: _TokenBucket, - attempts: int = _MAX_RETRIES, + attempts: int = common._MAX_RETRIES, backoff_initial: float = _BACKOFF_INITIAL, backoff_max: float = _BACKOFF_MAX, adaptive_retry: bool = False, diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 33727d641c..44e82cb49c 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -112,7 +112,6 @@ from pymongo.synchronous.client_session import _SESSION, _EmptyServerSession from pymongo.synchronous.command_cursor import CommandCursor from pymongo.synchronous.helpers import ( - _RETRY_ATTEMPT, _RetryPolicy, _TokenBucket, ) @@ -897,7 +896,7 @@ def __init__( self._retry_policy = _RetryPolicy( _TokenBucket(), - attempts=self._options.max_adaptive_retries, + attempts=self._options.max_retries, adaptive_retry=self._options.adaptive_retries, ) @@ -2813,7 +2812,6 @@ def run(self) -> T: while True: self._check_last_error(check_csot=True) - retry_token = _RETRY_ATTEMPT.set(self._attempt_number) try: res = self._read() if self._is_read else self._write() self._retry_policy.record_success(self._attempt_number > 0) @@ -2940,8 +2938,6 @@ def run(self) -> T: else: raise time.sleep(delay) - finally: - _RETRY_ATTEMPT.reset(retry_token) def _is_not_eligible_for_retry(self) -> bool: """Checks if the exchange is not eligible for retry""" diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index bf93d302a5..d33cb59a98 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -88,7 +88,7 @@ from pymongo.server_type import SERVER_TYPE from pymongo.socket_checker import SocketChecker from pymongo.synchronous.client_session import _validate_session_write_concern -from pymongo.synchronous.helpers import _RETRY_ATTEMPT, _handle_reauth +from pymongo.synchronous.helpers import _handle_reauth from pymongo.synchronous.network import command if TYPE_CHECKING: @@ -395,9 +395,6 @@ def command( if session: session._apply_to(spec, retryable_write, read_preference, self) self.send_cluster_time(spec, session, client) - retry_attempt = _RETRY_ATTEMPT.get() - if retry_attempt > 0: - spec["retry"] = retry_attempt listeners = self.listeners if publish_events else None unacknowledged = bool(write_concern and not write_concern.acknowledged) if self.op_msg_enabled: