From 6acd2bd5d131507365001a0c4a91c20de37ddf13 Mon Sep 17 00:00:00 2001 From: Jeffrey 'Alex' Clark Date: Thu, 25 Jun 2026 12:27:54 -0400 Subject: [PATCH] PYTHON-5856 Move cursor execution logic into cursor class Move the command preparation and response construction that lived in Server.run_operation into _AsyncCursorBase._run_with_conn, so cursors call run_cursor_command directly without routing through Server. - Extract _CURSOR_DOC_FIELDS, _split_message, and _operation_to_command as module-level helpers in cursor_base.py - Add abstract _unpack_response to _AsyncCursorBase to make the interface explicit - Add _run_with_conn (carrying the @_handle_reauth decorator) to _AsyncCursorBase; this is the new home for all of run_operation's logic - client._run_operation now accepts execute_fn(conn, op, rp) -> Response instead of routing through server.run_operation - Remove Server.run_operation, Server.operation_to_command, and Server._split_message entirely --- pymongo/asynchronous/command_cursor.py | 2 +- pymongo/asynchronous/cursor.py | 2 +- pymongo/asynchronous/cursor_base.py | 119 +++++++++++++++++++++- pymongo/asynchronous/mongo_client.py | 25 ++--- pymongo/asynchronous/server.py | 136 +------------------------ pymongo/synchronous/command_cursor.py | 4 +- pymongo/synchronous/cursor.py | 4 +- pymongo/synchronous/cursor_base.py | 119 +++++++++++++++++++++- pymongo/synchronous/mongo_client.py | 25 ++--- pymongo/synchronous/server.py | 136 +------------------------ test/asynchronous/test_client.py | 2 +- test/test_client.py | 2 +- 12 files changed, 255 insertions(+), 321 deletions(-) diff --git a/pymongo/asynchronous/command_cursor.py b/pymongo/asynchronous/command_cursor.py index fb85755362..317608047d 100644 --- a/pymongo/asynchronous/command_cursor.py +++ b/pymongo/asynchronous/command_cursor.py @@ -162,7 +162,7 @@ async def _send_message(self, operation: _GetMore) -> None: client = self._collection.database.client try: response = await client._run_operation( - operation, self._unpack_response, address=self._address + operation, self._run_with_conn, address=self._address ) except OperationFailure as exc: if exc.code in _CURSOR_CLOSED_ERRORS: diff --git a/pymongo/asynchronous/cursor.py b/pymongo/asynchronous/cursor.py index 4a4fdf43ff..91b65cccf4 100644 --- a/pymongo/asynchronous/cursor.py +++ b/pymongo/asynchronous/cursor.py @@ -980,7 +980,7 @@ async def _send_message(self, operation: Union[_Query, _GetMore]) -> None: try: response = await client._run_operation( - operation, self._unpack_response, address=self._address + operation, self._run_with_conn, address=self._address ) except OperationFailure as exc: if exc.code in _CURSOR_CLOSED_ERRORS or self._exhaust: diff --git a/pymongo/asynchronous/cursor_base.py b/pymongo/asynchronous/cursor_base.py index ce3114684a..fee9814870 100644 --- a/pymongo/asynchronous/cursor_base.py +++ b/pymongo/asynchronous/cursor_base.py @@ -16,20 +16,57 @@ from __future__ import annotations +import datetime from abc import abstractmethod -from typing import TYPE_CHECKING, Any, Optional +from collections.abc import Mapping, Sequence +from typing import TYPE_CHECKING, Any, Optional, Union from pymongo import _csot +from pymongo.asynchronous.command_runner import run_cursor_command +from pymongo.asynchronous.helpers import _handle_reauth from pymongo.cursor_shared import _AgnosticCursorBase from pymongo.lock import _async_create_lock -from pymongo.typings import _DocumentType +from pymongo.message import _GetMore, _OpMsg, _Query +from pymongo.response import PinnedResponse, Response +from pymongo.typings import _DocumentOut, _DocumentType if TYPE_CHECKING: from pymongo.asynchronous.client_session import AsyncClientSession from pymongo.asynchronous.pool import AsyncConnection + from pymongo.read_preferences import _ServerMode _IS_SYNC = False +_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} + + +def _split_message( + message: Union[tuple[int, Any], tuple[int, Any, int]], +) -> tuple[int, Any, int]: + """Return request_id, data, max_doc_size. + + :param message: (request_id, data, max_doc_size) or (request_id, data) + """ + if len(message) == 3: + return message # type: ignore[return-value] + # get_more and kill_cursors messages don't include BSON documents. + request_id, data = message # type: ignore[misc] + return request_id, data, 0 + + +async def _operation_to_command( + operation: Union[_Query, _GetMore], + conn: AsyncConnection, + use_cmd: bool, +) -> tuple[dict[str, Any], str]: + cmd, db = operation.as_command(conn, use_cmd) + if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: + cmd = await operation.client._encrypter.encrypt( # type: ignore[misc, assignment] + operation.db, cmd, operation.codec_options + ) + operation.update_command(cmd) + return cmd, db + class _ConnectionManager: """Used with exhaust cursors to ensure the connection is returned.""" @@ -66,6 +103,84 @@ def session(self) -> Optional[AsyncClientSession]: async def _next_batch(self, result: list, total: Optional[int] = None) -> bool: # type: ignore[type-arg] ... + @abstractmethod + def _unpack_response( + self, + response: _OpMsg, + cursor_id: Optional[int], + codec_options: Any, + user_fields: Optional[Mapping[str, Any]] = None, + legacy_response: bool = False, + ) -> Sequence[_DocumentOut]: ... + + @_handle_reauth + async def _run_with_conn( + self, + conn: AsyncConnection, + operation: Union[_Query, _GetMore], + read_preference: _ServerMode, + ) -> Response: + """Execute a cursor operation on the given connection and return a Response.""" + client = self._collection.database.client + use_cmd = operation.use_command(conn) + more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) + cmd, dbn = await _operation_to_command(operation, conn, use_cmd) + if more_to_come: + request_id, data, max_doc_size = 0, b"", 0 + else: + message = operation.get_message(read_preference, conn, use_cmd) + request_id, data, max_doc_size = _split_message(message) + user_fields = _CURSOR_DOC_FIELDS if use_cmd else None + docs, reply, duration = await run_cursor_command( + conn, + cmd, + dbn, + request_id, + data, + client=client, + session=operation.session, # type: ignore[arg-type] + listeners=client._event_listeners, + address=conn.address, + start=datetime.datetime.now(), + codec_options=operation.codec_options, + user_fields=user_fields, + command_name=operation.name, + pool_opts=conn.opts, + max_doc_size=max_doc_size, + more_to_come=more_to_come, + unpack_res=self._unpack_response, + cursor_id=operation.cursor_id, + ) + assert reply is not None + if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] + conn.pin_cursor() + if isinstance(reply, _OpMsg): + # In OP_MSG, the server keeps sending only if the more_to_come flag is set. + more_to_come = reply.more_to_come + else: + # In OP_REPLY, the server keeps sending until cursor_id is 0. + more_to_come = bool(operation.exhaust and reply.cursor_id) + if operation.conn_mgr: + operation.conn_mgr.update_exhaust(more_to_come) + return PinnedResponse( + data=reply, + address=conn.address, + conn=conn, + duration=duration, + request_id=request_id, + from_command=use_cmd, + docs=docs, # type: ignore[arg-type] + more_to_come=more_to_come, + ) + return Response( + data=reply, + address=conn.address, + duration=duration, + request_id=request_id, + from_command=use_cmd, + docs=docs, # type: ignore[arg-type] + ) + async def _die_lock(self) -> None: """Closes this cursor.""" try: diff --git a/pymongo/asynchronous/mongo_client.py b/pymongo/asynchronous/mongo_client.py index 6aeea53f4c..aa0fa3666f 100644 --- a/pymongo/asynchronous/mongo_client.py +++ b/pymongo/asynchronous/mongo_client.py @@ -1906,13 +1906,14 @@ async def _conn_for_reads( async def _run_operation( self, operation: Union[_Query, _GetMore], - unpack_res: Callable, # type: ignore[type-arg] + execute_fn: Callable, # type: ignore[type-arg] address: Optional[_Address] = None, ) -> Response: """Run a _Query/_GetMore operation and return a Response. :param operation: a _Query or _GetMore object. - :param unpack_res: A callable that decodes the wire protocol response. + :param execute_fn: A callable ``(conn, operation, read_preference) -> Response`` + that executes the operation on a given connection. :param address: Optional address when sending a message to a specific server, used for getMore. """ @@ -1927,30 +1928,18 @@ async def _run_operation( async with operation.conn_mgr._lock: async with _MongoClientErrorHandler(self, server, operation.session) as err_handler: # type: ignore[arg-type] err_handler.contribute_socket(operation.conn_mgr.conn) - return await server.run_operation( - operation.conn_mgr.conn, - operation, - operation.read_preference, - self._event_listeners, - unpack_res, - self, + return await execute_fn( + operation.conn_mgr.conn, operation, operation.read_preference ) async def _cmd( _session: Optional[AsyncClientSession], - server: Server, + _server: Server, conn: AsyncConnection, read_preference: _ServerMode, ) -> Response: operation.reset() # Reset op in case of retry. - return await server.run_operation( - conn, - operation, - read_preference, - self._event_listeners, - unpack_res, - self, - ) + return await execute_fn(conn, operation, read_preference) return await self._retryable_read( _cmd, diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index 9a6984f486..0b19fd16a3 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -18,42 +18,31 @@ import logging from contextlib import AbstractAsyncContextManager -from datetime import datetime from typing import ( TYPE_CHECKING, Any, - Callable, Optional, - Union, ) -from pymongo.asynchronous.command_runner import run_cursor_command -from pymongo.asynchronous.helpers import _handle_reauth from pymongo.logger import ( _SDAM_LOGGER, _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _GetMore, _OpMsg, _Query -from pymongo.response import PinnedResponse, Response if TYPE_CHECKING: from queue import Queue from weakref import ReferenceType from bson.objectid import ObjectId - from pymongo.asynchronous.mongo_client import AsyncMongoClient, _MongoClientErrorHandler + from pymongo.asynchronous.mongo_client import _MongoClientErrorHandler from pymongo.asynchronous.monitor import Monitor from pymongo.asynchronous.pool import AsyncConnection, Pool from pymongo.monitoring import _EventListeners - from pymongo.read_preferences import _ServerMode from pymongo.server_description import ServerDescription - from pymongo.typings import _DocumentOut _IS_SYNC = False -_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} - class Server: def __init__( @@ -118,115 +107,6 @@ def request_check(self) -> None: """Check the server's state soon.""" self._monitor.request_check() - async def operation_to_command( - self, operation: Union[_Query, _GetMore], conn: AsyncConnection, apply_timeout: bool = False - ) -> tuple[dict[str, Any], str]: - cmd, db = operation.as_command(conn, apply_timeout) - # Support auto encryption - if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: - cmd = await operation.client._encrypter.encrypt( # type: ignore[misc, assignment] - operation.db, cmd, operation.codec_options - ) - operation.update_command(cmd) - - return cmd, db - - @_handle_reauth - async def run_operation( - self, - conn: AsyncConnection, - operation: Union[_Query, _GetMore], - read_preference: _ServerMode, - listeners: Optional[_EventListeners], - unpack_res: Callable[..., list[_DocumentOut]], - client: AsyncMongoClient[Any], - ) -> Response: - """Run a _Query or _GetMore operation and return a Response object. - - This method is used only to run _Query/_GetMore operations from - cursors. - Can raise ConnectionFailure, OperationFailure, etc. - - :param conn: An AsyncConnection instance. - :param operation: A _Query or _GetMore object. - :param read_preference: The read preference to use. - :param listeners: Instance of _EventListeners or None. - :param unpack_res: A callable that decodes the wire protocol response. - :param client: An AsyncMongoClient instance. - """ - assert listeners is not None - start = datetime.now() - - use_cmd = operation.use_command(conn) - more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) - cmd, dbn = await self.operation_to_command(operation, conn, use_cmd) - if more_to_come: - request_id = 0 - data = b"" - max_doc_size = 0 - else: - message = operation.get_message(read_preference, conn, use_cmd) - request_id, data, max_doc_size = self._split_message(message) - - user_fields = _CURSOR_DOC_FIELDS if use_cmd else None - - docs, reply, duration = await run_cursor_command( - conn, - cmd, - dbn, - request_id, - data, - client=client, - session=operation.session, # type: ignore[arg-type] - listeners=listeners, - address=conn.address, - start=start, - codec_options=operation.codec_options, - user_fields=user_fields, - command_name=operation.name, - pool_opts=conn.opts, - max_doc_size=max_doc_size, - more_to_come=more_to_come, - unpack_res=unpack_res, - cursor_id=operation.cursor_id, - ) - assert reply is not None - - response: Response - - if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] - conn.pin_cursor() - if isinstance(reply, _OpMsg): - # In OP_MSG, the server keeps sending only if the - # more_to_come flag is set. - more_to_come = reply.more_to_come - else: - # In OP_REPLY, the server keeps sending until cursor_id is 0. - more_to_come = bool(operation.exhaust and reply.cursor_id) - if operation.conn_mgr: - operation.conn_mgr.update_exhaust(more_to_come) - response = PinnedResponse( - data=reply, - address=self._description.address, - conn=conn, - duration=duration, - request_id=request_id, - from_command=use_cmd, - docs=docs, # type: ignore[arg-type] - more_to_come=more_to_come, - ) - else: - response = Response( - data=reply, - address=self._description.address, - duration=duration, - request_id=request_id, - from_command=use_cmd, - docs=docs, # type: ignore[arg-type] - ) - - return response - async def checkout( self, handler: Optional[_MongoClientErrorHandler] = None ) -> AbstractAsyncContextManager[AsyncConnection]: @@ -245,19 +125,5 @@ def description(self, server_description: ServerDescription) -> None: def pool(self) -> Pool: return self._pool - def _split_message( - self, message: Union[tuple[int, Any], tuple[int, Any, int]] - ) -> tuple[int, Any, int]: - """Return request_id, data, max_doc_size. - - :param message: (request_id, data, max_doc_size) or (request_id, data) - """ - if len(message) == 3: - return message # type: ignore[return-value] - else: - # get_more and kill_cursors messages don't include BSON documents. - request_id, data = message # type: ignore[misc] - return request_id, data, 0 - def __repr__(self) -> str: return f"<{self.__class__.__name__} {self._description!r}>" diff --git a/pymongo/synchronous/command_cursor.py b/pymongo/synchronous/command_cursor.py index 82a9732bd9..8cea4cff65 100644 --- a/pymongo/synchronous/command_cursor.py +++ b/pymongo/synchronous/command_cursor.py @@ -161,9 +161,7 @@ def _send_message(self, operation: _GetMore) -> None: """Send a getmore message and handle the response.""" client = self._collection.database.client try: - response = client._run_operation( - operation, self._unpack_response, address=self._address - ) + response = client._run_operation(operation, self._run_with_conn, address=self._address) except OperationFailure as exc: if exc.code in _CURSOR_CLOSED_ERRORS: # Don't send killCursors because the cursor is already closed. diff --git a/pymongo/synchronous/cursor.py b/pymongo/synchronous/cursor.py index e1fdf2af0e..ea3f9a19ba 100644 --- a/pymongo/synchronous/cursor.py +++ b/pymongo/synchronous/cursor.py @@ -977,9 +977,7 @@ def _send_message(self, operation: Union[_Query, _GetMore]) -> None: raise InvalidOperation("exhaust cursors do not support auto encryption") try: - response = client._run_operation( - operation, self._unpack_response, address=self._address - ) + response = client._run_operation(operation, self._run_with_conn, address=self._address) except OperationFailure as exc: if exc.code in _CURSOR_CLOSED_ERRORS or self._exhaust: # Don't send killCursors because the cursor is already closed. diff --git a/pymongo/synchronous/cursor_base.py b/pymongo/synchronous/cursor_base.py index 96e69cb6ee..4806a65854 100644 --- a/pymongo/synchronous/cursor_base.py +++ b/pymongo/synchronous/cursor_base.py @@ -16,20 +16,57 @@ from __future__ import annotations +import datetime from abc import abstractmethod -from typing import TYPE_CHECKING, Any, Optional +from collections.abc import Mapping, Sequence +from typing import TYPE_CHECKING, Any, Optional, Union from pymongo import _csot from pymongo.cursor_shared import _AgnosticCursorBase from pymongo.lock import _create_lock -from pymongo.typings import _DocumentType +from pymongo.message import _GetMore, _OpMsg, _Query +from pymongo.response import PinnedResponse, Response +from pymongo.synchronous.command_runner import run_cursor_command +from pymongo.synchronous.helpers import _handle_reauth +from pymongo.typings import _DocumentOut, _DocumentType if TYPE_CHECKING: + from pymongo.read_preferences import _ServerMode from pymongo.synchronous.client_session import ClientSession from pymongo.synchronous.pool import Connection _IS_SYNC = True +_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} + + +def _split_message( + message: Union[tuple[int, Any], tuple[int, Any, int]], +) -> tuple[int, Any, int]: + """Return request_id, data, max_doc_size. + + :param message: (request_id, data, max_doc_size) or (request_id, data) + """ + if len(message) == 3: + return message # type: ignore[return-value] + # get_more and kill_cursors messages don't include BSON documents. + request_id, data = message # type: ignore[misc] + return request_id, data, 0 + + +def _operation_to_command( + operation: Union[_Query, _GetMore], + conn: Connection, + use_cmd: bool, +) -> tuple[dict[str, Any], str]: + cmd, db = operation.as_command(conn, use_cmd) + if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: + cmd = operation.client._encrypter.encrypt( # type: ignore[misc, assignment] + operation.db, cmd, operation.codec_options + ) + operation.update_command(cmd) + return cmd, db + class _ConnectionManager: """Used with exhaust cursors to ensure the connection is returned.""" @@ -66,6 +103,84 @@ def session(self) -> Optional[ClientSession]: def _next_batch(self, result: list, total: Optional[int] = None) -> bool: # type: ignore[type-arg] ... + @abstractmethod + def _unpack_response( + self, + response: _OpMsg, + cursor_id: Optional[int], + codec_options: Any, + user_fields: Optional[Mapping[str, Any]] = None, + legacy_response: bool = False, + ) -> Sequence[_DocumentOut]: ... + + @_handle_reauth + def _run_with_conn( + self, + conn: Connection, + operation: Union[_Query, _GetMore], + read_preference: _ServerMode, + ) -> Response: + """Execute a cursor operation on the given connection and return a Response.""" + client = self._collection.database.client + use_cmd = operation.use_command(conn) + more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) + cmd, dbn = _operation_to_command(operation, conn, use_cmd) + if more_to_come: + request_id, data, max_doc_size = 0, b"", 0 + else: + message = operation.get_message(read_preference, conn, use_cmd) + request_id, data, max_doc_size = _split_message(message) + user_fields = _CURSOR_DOC_FIELDS if use_cmd else None + docs, reply, duration = run_cursor_command( + conn, + cmd, + dbn, + request_id, + data, + client=client, + session=operation.session, # type: ignore[arg-type] + listeners=client._event_listeners, + address=conn.address, + start=datetime.datetime.now(), + codec_options=operation.codec_options, + user_fields=user_fields, + command_name=operation.name, + pool_opts=conn.opts, + max_doc_size=max_doc_size, + more_to_come=more_to_come, + unpack_res=self._unpack_response, + cursor_id=operation.cursor_id, + ) + assert reply is not None + if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] + conn.pin_cursor() + if isinstance(reply, _OpMsg): + # In OP_MSG, the server keeps sending only if the more_to_come flag is set. + more_to_come = reply.more_to_come + else: + # In OP_REPLY, the server keeps sending until cursor_id is 0. + more_to_come = bool(operation.exhaust and reply.cursor_id) + if operation.conn_mgr: + operation.conn_mgr.update_exhaust(more_to_come) + return PinnedResponse( + data=reply, + address=conn.address, + conn=conn, + duration=duration, + request_id=request_id, + from_command=use_cmd, + docs=docs, # type: ignore[arg-type] + more_to_come=more_to_come, + ) + return Response( + data=reply, + address=conn.address, + duration=duration, + request_id=request_id, + from_command=use_cmd, + docs=docs, # type: ignore[arg-type] + ) + def _die_lock(self) -> None: """Closes this cursor.""" try: diff --git a/pymongo/synchronous/mongo_client.py b/pymongo/synchronous/mongo_client.py index 6b7c5d9c98..b6bb3baf79 100644 --- a/pymongo/synchronous/mongo_client.py +++ b/pymongo/synchronous/mongo_client.py @@ -1903,13 +1903,14 @@ def _conn_for_reads( def _run_operation( self, operation: Union[_Query, _GetMore], - unpack_res: Callable, # type: ignore[type-arg] + execute_fn: Callable, # type: ignore[type-arg] address: Optional[_Address] = None, ) -> Response: """Run a _Query/_GetMore operation and return a Response. :param operation: a _Query or _GetMore object. - :param unpack_res: A callable that decodes the wire protocol response. + :param execute_fn: A callable ``(conn, operation, read_preference) -> Response`` + that executes the operation on a given connection. :param address: Optional address when sending a message to a specific server, used for getMore. """ @@ -1924,30 +1925,16 @@ def _run_operation( with operation.conn_mgr._lock: with _MongoClientErrorHandler(self, server, operation.session) as err_handler: # type: ignore[arg-type] err_handler.contribute_socket(operation.conn_mgr.conn) - return server.run_operation( - operation.conn_mgr.conn, - operation, - operation.read_preference, - self._event_listeners, - unpack_res, - self, - ) + return execute_fn(operation.conn_mgr.conn, operation, operation.read_preference) def _cmd( _session: Optional[ClientSession], - server: Server, + _server: Server, conn: Connection, read_preference: _ServerMode, ) -> Response: operation.reset() # Reset op in case of retry. - return server.run_operation( - conn, - operation, - read_preference, - self._event_listeners, - unpack_res, - self, - ) + return execute_fn(conn, operation, read_preference) return self._retryable_read( _cmd, diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index 7aa017134a..8b0ad4bcb0 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -18,13 +18,10 @@ import logging from contextlib import AbstractContextManager -from datetime import datetime from typing import ( TYPE_CHECKING, Any, - Callable, Optional, - Union, ) from pymongo.logger import ( @@ -32,10 +29,6 @@ _debug_log, _SDAMStatusMessage, ) -from pymongo.message import _GetMore, _OpMsg, _Query -from pymongo.response import PinnedResponse, Response -from pymongo.synchronous.command_runner import run_cursor_command -from pymongo.synchronous.helpers import _handle_reauth if TYPE_CHECKING: from queue import Queue @@ -43,17 +36,13 @@ from bson.objectid import ObjectId from pymongo.monitoring import _EventListeners - from pymongo.read_preferences import _ServerMode from pymongo.server_description import ServerDescription - from pymongo.synchronous.mongo_client import MongoClient, _MongoClientErrorHandler + from pymongo.synchronous.mongo_client import _MongoClientErrorHandler from pymongo.synchronous.monitor import Monitor from pymongo.synchronous.pool import Connection, Pool - from pymongo.typings import _DocumentOut _IS_SYNC = True -_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}} - class Server: def __init__( @@ -118,115 +107,6 @@ def request_check(self) -> None: """Check the server's state soon.""" self._monitor.request_check() - def operation_to_command( - self, operation: Union[_Query, _GetMore], conn: Connection, apply_timeout: bool = False - ) -> tuple[dict[str, Any], str]: - cmd, db = operation.as_command(conn, apply_timeout) - # Support auto encryption - if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption: - cmd = operation.client._encrypter.encrypt( # type: ignore[misc, assignment] - operation.db, cmd, operation.codec_options - ) - operation.update_command(cmd) - - return cmd, db - - @_handle_reauth - def run_operation( - self, - conn: Connection, - operation: Union[_Query, _GetMore], - read_preference: _ServerMode, - listeners: Optional[_EventListeners], - unpack_res: Callable[..., list[_DocumentOut]], - client: MongoClient[Any], - ) -> Response: - """Run a _Query or _GetMore operation and return a Response object. - - This method is used only to run _Query/_GetMore operations from - cursors. - Can raise ConnectionFailure, OperationFailure, etc. - - :param conn: A Connection instance. - :param operation: A _Query or _GetMore object. - :param read_preference: The read preference to use. - :param listeners: Instance of _EventListeners or None. - :param unpack_res: A callable that decodes the wire protocol response. - :param client: A MongoClient instance. - """ - assert listeners is not None - start = datetime.now() - - use_cmd = operation.use_command(conn) - more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) - cmd, dbn = self.operation_to_command(operation, conn, use_cmd) - if more_to_come: - request_id = 0 - data = b"" - max_doc_size = 0 - else: - message = operation.get_message(read_preference, conn, use_cmd) - request_id, data, max_doc_size = self._split_message(message) - - user_fields = _CURSOR_DOC_FIELDS if use_cmd else None - - docs, reply, duration = run_cursor_command( - conn, - cmd, - dbn, - request_id, - data, - client=client, - session=operation.session, # type: ignore[arg-type] - listeners=listeners, - address=conn.address, - start=start, - codec_options=operation.codec_options, - user_fields=user_fields, - command_name=operation.name, - pool_opts=conn.opts, - max_doc_size=max_doc_size, - more_to_come=more_to_come, - unpack_res=unpack_res, - cursor_id=operation.cursor_id, - ) - assert reply is not None - - response: Response - - if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type] - conn.pin_cursor() - if isinstance(reply, _OpMsg): - # In OP_MSG, the server keeps sending only if the - # more_to_come flag is set. - more_to_come = reply.more_to_come - else: - # In OP_REPLY, the server keeps sending until cursor_id is 0. - more_to_come = bool(operation.exhaust and reply.cursor_id) - if operation.conn_mgr: - operation.conn_mgr.update_exhaust(more_to_come) - response = PinnedResponse( - data=reply, - address=self._description.address, - conn=conn, - duration=duration, - request_id=request_id, - from_command=use_cmd, - docs=docs, # type: ignore[arg-type] - more_to_come=more_to_come, - ) - else: - response = Response( - data=reply, - address=self._description.address, - duration=duration, - request_id=request_id, - from_command=use_cmd, - docs=docs, # type: ignore[arg-type] - ) - - return response - def checkout( self, handler: Optional[_MongoClientErrorHandler] = None ) -> AbstractContextManager[Connection]: @@ -245,19 +125,5 @@ def description(self, server_description: ServerDescription) -> None: def pool(self) -> Pool: return self._pool - def _split_message( - self, message: Union[tuple[int, Any], tuple[int, Any, int]] - ) -> tuple[int, Any, int]: - """Return request_id, data, max_doc_size. - - :param message: (request_id, data, max_doc_size) or (request_id, data) - """ - if len(message) == 3: - return message # type: ignore[return-value] - else: - # get_more and kill_cursors messages don't include BSON documents. - request_id, data = message # type: ignore[misc] - return request_id, data, 0 - def __repr__(self) -> str: return f"<{self.__class__.__name__} {self._description!r}>" diff --git a/test/asynchronous/test_client.py b/test/asynchronous/test_client.py index 5da186931a..0a65ff70b7 100644 --- a/test/asynchronous/test_client.py +++ b/test/asynchronous/test_client.py @@ -1676,7 +1676,7 @@ async def test_stale_getmore(self): False, None, ), - unpack_res=AsyncCursor(client.pymongo_test.collection)._unpack_response, + execute_fn=AsyncCursor(client.pymongo_test.collection)._run_with_conn, address=("not-a-member", 27017), ) diff --git a/test/test_client.py b/test/test_client.py index b37b5e57ac..a32de9db72 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -1633,7 +1633,7 @@ def test_stale_getmore(self): False, None, ), - unpack_res=Cursor(client.pymongo_test.collection)._unpack_response, + execute_fn=Cursor(client.pymongo_test.collection)._run_with_conn, address=("not-a-member", 27017), )