From 63a9768d927b6373efe870c6c4427a594c733d55 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 24 Jun 2026 11:14:52 -0500 Subject: [PATCH 1/4] PYTHON-5745 Consolidate command telemetry into _CommandTelemetry --- pymongo/_telemetry.py | 202 +++++++++++++++++++++++++ pymongo/asynchronous/command_runner.py | 146 +++--------------- pymongo/asynchronous/pool.py | 4 +- pymongo/asynchronous/server.py | 4 - pymongo/message.py | 3 - pymongo/pool_shared.py | 11 ++ pymongo/synchronous/command_runner.py | 146 +++--------------- pymongo/synchronous/pool.py | 4 +- pymongo/synchronous/server.py | 4 - 9 files changed, 259 insertions(+), 265 deletions(-) create mode 100644 pymongo/_telemetry.py diff --git a/pymongo/_telemetry.py b/pymongo/_telemetry.py new file mode 100644 index 0000000000..969a0a51f5 --- /dev/null +++ b/pymongo/_telemetry.py @@ -0,0 +1,202 @@ +# Copyright 2015-present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Internal helpers combining structured logging with APM event publishing.""" + +from __future__ import annotations + +import datetime +import logging +from collections.abc import MutableMapping +from typing import TYPE_CHECKING, Any, Optional + +from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log +from pymongo.pool_shared import _ConnectionTelemetryInfo + +if TYPE_CHECKING: + from bson.objectid import ObjectId + from pymongo.monitoring import _EventListeners + from pymongo.typings import _DocumentOut + + +class _CommandTelemetry: + """Combines structured logging and APM event publishing for a single command. + + Construct once per command, call :meth:`started` before the network send, + then call :meth:`succeeded` or :meth:`failed` when the outcome is known. + Duration is measured from the :meth:`started` call. + """ + + __slots__ = ( + "_cmd", + "_conn", + "_dbname", + "_duration", + "_listeners", + "_name", + "_op_id", + "_publish", + "_request_id", + "_should_log", + "_start", + "_topology_id", + ) + + def __init__( + self, + topology_id: Optional[ObjectId], + conn: _ConnectionTelemetryInfo, + listeners: Optional[_EventListeners], + cmd: MutableMapping[str, Any], + dbname: str, + request_id: int, + op_id: Optional[int], + ) -> None: + self._topology_id = topology_id + self._should_log = topology_id is not None and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG) + self._publish = listeners is not None and listeners.enabled_for_commands + self._listeners = listeners + self._conn = conn + self._cmd = cmd + self._name = next(iter(cmd)) + self._dbname = dbname + self._request_id = request_id + self._op_id = op_id + self._start: datetime.datetime + self._duration: datetime.timedelta + + def started(self, orig: MutableMapping[str, Any], ensure_db: bool) -> None: + """Emit the STARTED log entry and APM event, and start the duration clock.""" + self._start = datetime.datetime.now() + if self._should_log: + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.STARTED, + clientId=self._topology_id, + command=self._cmd, + commandName=self._name, + databaseName=self._dbname, + requestId=self._request_id, + operationId=self._request_id, + driverConnectionId=self._conn.id, + serverConnectionId=self._conn.server_connection_id, + serverHost=self._conn.address[0], + serverPort=self._conn.address[1], + serviceId=self._conn.service_id, + ) + if self._publish: + assert self._listeners is not None + if ensure_db and "$db" not in orig: + orig["$db"] = self._dbname + self._listeners.publish_command_start( + orig, + self._dbname, + self._request_id, + self._conn.address, + self._conn.server_connection_id, + self._op_id, + service_id=self._conn.service_id, + ) + + @property + def duration(self) -> datetime.timedelta: + """Duration from :meth:`started` to :meth:`succeeded` or :meth:`failed`.""" + return self._duration + + def succeeded( + self, + reply: _DocumentOut, + command_name: str, + speculative_hello: bool, + ) -> None: + """Emit the SUCCEEDED log entry and APM event.""" + self._duration = datetime.datetime.now() - self._start + if not self._should_log and not self._publish: + return + duration = self._duration + if self._should_log: + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.SUCCEEDED, + clientId=self._topology_id, + durationMS=duration, + reply=reply, + commandName=self._name, + databaseName=self._dbname, + requestId=self._request_id, + operationId=self._request_id, + driverConnectionId=self._conn.id, + serverConnectionId=self._conn.server_connection_id, + serverHost=self._conn.address[0], + serverPort=self._conn.address[1], + serviceId=self._conn.service_id, + speculative_authenticate=speculative_hello, + ) + if self._publish: + assert self._listeners is not None + self._listeners.publish_command_success( + duration, + reply, + command_name, + self._request_id, + self._conn.address, + self._conn.server_connection_id, + self._op_id, + service_id=self._conn.service_id, + speculative_hello=speculative_hello, + database_name=self._dbname, + ) + + def failed( + self, + failure: _DocumentOut, + command_name: str, + is_server_side_error: bool, + ) -> None: + """Emit the FAILED log entry and APM event.""" + self._duration = datetime.datetime.now() - self._start + if not self._should_log and not self._publish: + return + duration = self._duration + if self._should_log: + _debug_log( + _COMMAND_LOGGER, + message=_CommandStatusMessage.FAILED, + clientId=self._topology_id, + durationMS=duration, + failure=failure, + commandName=self._name, + databaseName=self._dbname, + requestId=self._request_id, + operationId=self._request_id, + driverConnectionId=self._conn.id, + serverConnectionId=self._conn.server_connection_id, + serverHost=self._conn.address[0], + serverPort=self._conn.address[1], + serviceId=self._conn.service_id, + isServerSideError=is_server_side_error, + ) + if self._publish: + assert self._listeners is not None + self._listeners.publish_command_failure( + duration, + failure, + command_name, + self._request_id, + self._conn.address, + self._conn.server_connection_id, + self._op_id, + service_id=self._conn.service_id, + database_name=self._dbname, + ) diff --git a/pymongo/asynchronous/command_runner.py b/pymongo/asynchronous/command_runner.py index b663893a7b..7e15dd17e0 100644 --- a/pymongo/asynchronous/command_runner.py +++ b/pymongo/asynchronous/command_runner.py @@ -36,7 +36,6 @@ from __future__ import annotations import datetime -import logging from collections.abc import Mapping, MutableMapping, Sequence from typing import ( TYPE_CHECKING, @@ -49,14 +48,15 @@ from bson import _decode_all_selective from pymongo import _csot, helpers_shared, message +from pymongo._telemetry import _CommandTelemetry from pymongo.compression_support import _NO_COMPRESSION from pymongo.errors import NotPrimaryError, OperationFailure -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import _BulkWriteContextBase, _convert_exception, _OpMsg from pymongo.monitoring import _is_speculative_authenticate if TYPE_CHECKING: from bson import CodecOptions + from bson.objectid import ObjectId from pymongo.asynchronous.client_session import AsyncClientSession from pymongo.asynchronous.mongo_client import AsyncMongoClient from pymongo.asynchronous.pool import AsyncConnection @@ -65,7 +65,7 @@ from pymongo.pool_options import PoolOptions from pymongo.read_concern import ReadConcern from pymongo.read_preferences import _ServerMode - from pymongo.typings import _Address, _CollationIn, _DocumentOut, _DocumentType + from pymongo.typings import _CollationIn, _DocumentOut, _DocumentType from pymongo.write_concern import WriteConcern _IS_SYNC = False @@ -81,8 +81,7 @@ async def _run_command( client: Optional[AsyncMongoClient[Any]], session: Optional[AsyncClientSession], listeners: Optional[_EventListeners], - address: Optional[_Address], - start: datetime.datetime, + topology_id: Optional[ObjectId], codec_options: CodecOptions[_DocumentType], user_fields: Optional[Mapping[str, Any]] = None, orig: Optional[MutableMapping[str, Any]] = None, @@ -118,12 +117,12 @@ async def _run_command( :param request_id: The request id of the encoded message (``0`` when ``more_to_come`` and no message is sent). :param msg: The encoded bytes to send (ignored when ``more_to_come``). - :param client: The AsyncMongoClient, for ``$clusterTime`` gossip, logging, - and decryption. ``None`` disables those steps (e.g. during handshake). + :param client: The AsyncMongoClient, for ``$clusterTime`` gossip and + decryption. ``None`` disables those steps (e.g. during handshake). :param session: The session to update from the response. :param listeners: The event listeners, or ``None`` to disable APM. - :param address: The (host, port) of ``conn`` for APM events. - :param start: The ``datetime`` the operation began, for duration timing. + :param topology_id: The client topology id for structured logging, or + ``None`` to disable command logging. :param codec_options: The CodecOptions used to decode the reply. :param user_fields: Response fields decoded with the codec's TypeDecoders. :param orig: The command document published in the ``STARTED`` APM event; @@ -159,38 +158,9 @@ async def _run_command( command_name = name if orig is None: orig = cmd - publish = listeners is not None and listeners.enabled_for_commands - - if client is not None and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=name, - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - if publish: - assert listeners is not None - assert address is not None - if ensure_db and "$db" not in orig: - orig["$db"] = dbname - listeners.publish_command_start( - orig, - dbname, - request_id, - address, - conn.server_connection_id, - op_id, - service_id=conn.service_id, - ) + + telemetry = _CommandTelemetry(topology_id, conn, listeners, cmd, dbname, request_id, op_id) + telemetry.started(orig, ensure_db) reply: Optional[_OpMsg] = None docs: list[dict[str, Any]] = [{"ok": 1}] @@ -234,80 +204,14 @@ async def _run_command( pool_opts=pool_opts, ) except Exception as exc: - duration = datetime.datetime.now() - start if isinstance(exc, (NotPrimaryError, OperationFailure)): failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if client is not None and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=name, - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_failure( - duration, - failure, - command_name, - request_id, - address, - conn.server_connection_id, - op_id, - service_id=conn.service_id, - database_name=dbname, - ) + telemetry.failed(failure, command_name, isinstance(exc, OperationFailure)) raise - duration = datetime.datetime.now() - start - published_reply = docs[0] - if client is not None and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=published_reply, - commandName=name, - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - speculative_authenticate="speculativeAuthenticate" in orig, - ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_success( - duration, - published_reply, - command_name, - request_id, - address, - conn.server_connection_id, - op_id, - service_id=conn.service_id, - speculative_hello=speculative_hello, - database_name=dbname, - ) + telemetry.succeeded(docs[0], command_name, speculative_hello) if client and client._encrypter and reply and decrypt_reply: decrypted = await client._encrypter.decrypt(reply.raw_command_response()) @@ -315,7 +219,7 @@ async def _run_command( "list[dict[str, Any]]", _decode_all_selective(decrypted, codec_options, user_fields) ) - return docs, reply, duration + return docs, reply, telemetry.duration async def run_bulk_write_command( @@ -341,6 +245,7 @@ async def run_bulk_write_command( :param max_doc_size: The largest document size in the batch, passed to ``conn.send_message``. :param unacknowledged: When ``True``, send only and fake an ``{"ok": 1}`` reply. """ + topology_id = client._topology_settings._topology_id if client is not None else None return await _run_command( bwc.conn, # type: ignore[arg-type] cmd, @@ -350,8 +255,7 @@ async def run_bulk_write_command( client=client, session=bwc.session, # type: ignore[arg-type] listeners=bwc.listeners, - address=bwc.conn.address, # type: ignore[union-attr] - start=bwc.start_time, + topology_id=topology_id, codec_options=bwc.codec, op_id=bwc.op_id, command_name=bwc.name, @@ -372,8 +276,6 @@ async def run_cursor_command( client: Optional[AsyncMongoClient[Any]], session: Optional[AsyncClientSession], listeners: Optional[_EventListeners], - address: Optional[_Address], - start: datetime.datetime, codec_options: CodecOptions[_DocumentType], command_name: str, user_fields: Optional[Mapping[str, Any]] = None, @@ -393,8 +295,6 @@ async def run_cursor_command( :param client: The AsyncMongoClient, for ``$clusterTime`` gossip and logging. :param session: The session to update from the response. :param listeners: The event listeners, or ``None`` to disable APM. - :param address: The (host, port) of ``conn`` for APM events. - :param start: The ``datetime`` the operation began, for duration timing. :param codec_options: The CodecOptions used to decode the reply. :param command_name: The command name for APM events. :param user_fields: Response fields decoded with the codec's TypeDecoders. @@ -405,6 +305,7 @@ async def run_cursor_command( reply's own ``unpack_response`` is used. :param cursor_id: The cursor id passed to ``unpack_res``. """ + topology_id = client._topology_settings._topology_id if client is not None else None return await _run_command( conn, cmd, @@ -414,8 +315,7 @@ async def run_cursor_command( client=client, session=session, listeners=listeners, - address=address, - start=start, + topology_id=topology_id, codec_options=codec_options, user_fields=user_fields, command_name=command_name, @@ -438,7 +338,6 @@ async def run_command( client: Optional[AsyncMongoClient[Any]], check: bool = True, allowable_errors: Optional[Sequence[Union[str, int]]] = None, - address: Optional[_Address] = None, listeners: Optional[_EventListeners] = None, max_bson_size: Optional[int] = None, read_concern: Optional[ReadConcern] = None, @@ -465,7 +364,6 @@ async def run_command( :param client: The AsyncMongoClient, for ``$clusterTime`` gossip and logging. :param check: Raise OperationFailure if there are errors. :param allowable_errors: Errors to ignore when ``check`` is True. - :param address: The (host, port) of ``conn`` for APM events. :param listeners: The event listeners, or ``None`` to disable APM. :param max_bson_size: The maximum encoded BSON size for this server. :param read_concern: The read concern for this command. @@ -492,9 +390,8 @@ async def run_command( if collation is not None: spec["collation"] = collation - publish = listeners is not None and listeners.enabled_for_commands - start = datetime.datetime.now() - if publish: + topology_id = client._topology_settings._topology_id if client is not None else None + if listeners is not None and listeners.enabled_for_commands: speculative_hello = _is_speculative_authenticate(name, spec) if compression_ctx and name.lower() in _NO_COMPRESSION: @@ -529,8 +426,7 @@ async def run_command( client=client, session=session, listeners=listeners, - address=address, - start=start, + topology_id=topology_id, codec_options=codec_options, user_fields=user_fields, orig=orig, diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index 60acb93fcd..4ed3b85dbf 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -81,6 +81,7 @@ SSLErrors, _CancellationContext, _configured_protocol_interface, + _ConnectionTelemetryInfo, _raise_connection_failure, ) from pymongo.read_preferences import ReadPreference @@ -109,7 +110,7 @@ _IS_SYNC = False -class AsyncConnection: +class AsyncConnection(_ConnectionTelemetryInfo): """Store a connection with some metadata. :param conn: a raw connection object @@ -405,7 +406,6 @@ async def command( client, check, allowable_errors, - self.address, listeners, self.max_bson_size, read_concern, diff --git a/pymongo/asynchronous/server.py b/pymongo/asynchronous/server.py index 9a6984f486..57158dfc44 100644 --- a/pymongo/asynchronous/server.py +++ b/pymongo/asynchronous/server.py @@ -18,7 +18,6 @@ import logging from contextlib import AbstractAsyncContextManager -from datetime import datetime from typing import ( TYPE_CHECKING, Any, @@ -155,7 +154,6 @@ async def run_operation( :param client: An AsyncMongoClient instance. """ assert listeners is not None - start = datetime.now() use_cmd = operation.use_command(conn) more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) @@ -179,8 +177,6 @@ async def run_operation( client=client, session=operation.session, # type: ignore[arg-type] listeners=listeners, - address=conn.address, - start=start, codec_options=operation.codec_options, user_fields=user_fields, command_name=operation.name, diff --git a/pymongo/message.py b/pymongo/message.py index bcd2810895..2e3aa1dcbd 100644 --- a/pymongo/message.py +++ b/pymongo/message.py @@ -22,7 +22,6 @@ from __future__ import annotations -import datetime import random import struct from collections.abc import Iterable, Mapping, MutableMapping @@ -451,7 +450,6 @@ class _BulkWriteContextBase: "op_id", "op_type", "session", - "start_time", ) def __init__( @@ -471,7 +469,6 @@ def __init__( self.listeners = listeners self.name = cmd_name self.field = _FIELD_MAP[self.name] - self.start_time = datetime.datetime.now() self.session = session self.compress = bool(conn.compression_context) self.op_type = op_type diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index c97b0eb217..56adaeda9b 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -26,9 +26,11 @@ Any, NoReturn, Optional, + Protocol, Union, ) +from bson.objectid import ObjectId from pymongo import _csot from pymongo.asynchronous.helpers import _getaddrinfo from pymongo.errors import ( # type:ignore[attr-defined] @@ -48,6 +50,15 @@ from pymongo.typings import _Address +class _ConnectionTelemetryInfo(Protocol): + """Protocol for connection fields consumed by :class:`~pymongo._telemetry._CommandTelemetry`.""" + + id: int + server_connection_id: Optional[int] + address: tuple[str, int] + service_id: Optional[ObjectId] + + def _get_ssl_session(ssl_sock: Any) -> Optional[Any]: """Return the TLS session from an SSL socket, handling both PyOpenSSL and stdlib ssl.""" if hasattr(ssl_sock, "get_session"): diff --git a/pymongo/synchronous/command_runner.py b/pymongo/synchronous/command_runner.py index 7402451c7c..34a452efb6 100644 --- a/pymongo/synchronous/command_runner.py +++ b/pymongo/synchronous/command_runner.py @@ -36,7 +36,6 @@ from __future__ import annotations import datetime -import logging from collections.abc import Mapping, MutableMapping, Sequence from typing import ( TYPE_CHECKING, @@ -49,14 +48,15 @@ from bson import _decode_all_selective from pymongo import _csot, helpers_shared, message +from pymongo._telemetry import _CommandTelemetry from pymongo.compression_support import _NO_COMPRESSION from pymongo.errors import NotPrimaryError, OperationFailure -from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log from pymongo.message import _BulkWriteContextBase, _convert_exception, _OpMsg from pymongo.monitoring import _is_speculative_authenticate if TYPE_CHECKING: from bson import CodecOptions + from bson.objectid import ObjectId from pymongo.compression_support import SnappyContext, ZlibContext, ZstdContext from pymongo.monitoring import _EventListeners from pymongo.pool_options import PoolOptions @@ -65,7 +65,7 @@ from pymongo.synchronous.client_session import ClientSession from pymongo.synchronous.mongo_client import MongoClient from pymongo.synchronous.pool import Connection - from pymongo.typings import _Address, _CollationIn, _DocumentOut, _DocumentType + from pymongo.typings import _CollationIn, _DocumentOut, _DocumentType from pymongo.write_concern import WriteConcern _IS_SYNC = True @@ -81,8 +81,7 @@ def _run_command( client: Optional[MongoClient[Any]], session: Optional[ClientSession], listeners: Optional[_EventListeners], - address: Optional[_Address], - start: datetime.datetime, + topology_id: Optional[ObjectId], codec_options: CodecOptions[_DocumentType], user_fields: Optional[Mapping[str, Any]] = None, orig: Optional[MutableMapping[str, Any]] = None, @@ -118,12 +117,12 @@ def _run_command( :param request_id: The request id of the encoded message (``0`` when ``more_to_come`` and no message is sent). :param msg: The encoded bytes to send (ignored when ``more_to_come``). - :param client: The MongoClient, for ``$clusterTime`` gossip, logging, - and decryption. ``None`` disables those steps (e.g. during handshake). + :param client: The MongoClient, for ``$clusterTime`` gossip and + decryption. ``None`` disables those steps (e.g. during handshake). :param session: The session to update from the response. :param listeners: The event listeners, or ``None`` to disable APM. - :param address: The (host, port) of ``conn`` for APM events. - :param start: The ``datetime`` the operation began, for duration timing. + :param topology_id: The client topology id for structured logging, or + ``None`` to disable command logging. :param codec_options: The CodecOptions used to decode the reply. :param user_fields: Response fields decoded with the codec's TypeDecoders. :param orig: The command document published in the ``STARTED`` APM event; @@ -159,38 +158,9 @@ def _run_command( command_name = name if orig is None: orig = cmd - publish = listeners is not None and listeners.enabled_for_commands - - if client is not None and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.STARTED, - clientId=client._topology_settings._topology_id, - command=cmd, - commandName=name, - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - ) - if publish: - assert listeners is not None - assert address is not None - if ensure_db and "$db" not in orig: - orig["$db"] = dbname - listeners.publish_command_start( - orig, - dbname, - request_id, - address, - conn.server_connection_id, - op_id, - service_id=conn.service_id, - ) + + telemetry = _CommandTelemetry(topology_id, conn, listeners, cmd, dbname, request_id, op_id) + telemetry.started(orig, ensure_db) reply: Optional[_OpMsg] = None docs: list[dict[str, Any]] = [{"ok": 1}] @@ -234,80 +204,14 @@ def _run_command( pool_opts=pool_opts, ) except Exception as exc: - duration = datetime.datetime.now() - start if isinstance(exc, (NotPrimaryError, OperationFailure)): failure: _DocumentOut = exc.details # type: ignore[assignment] else: failure = _convert_exception(exc) - if client is not None and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.FAILED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - failure=failure, - commandName=name, - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - isServerSideError=isinstance(exc, OperationFailure), - ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_failure( - duration, - failure, - command_name, - request_id, - address, - conn.server_connection_id, - op_id, - service_id=conn.service_id, - database_name=dbname, - ) + telemetry.failed(failure, command_name, isinstance(exc, OperationFailure)) raise - duration = datetime.datetime.now() - start - published_reply = docs[0] - if client is not None and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG): - _debug_log( - _COMMAND_LOGGER, - message=_CommandStatusMessage.SUCCEEDED, - clientId=client._topology_settings._topology_id, - durationMS=duration, - reply=published_reply, - commandName=name, - databaseName=dbname, - requestId=request_id, - operationId=request_id, - driverConnectionId=conn.id, - serverConnectionId=conn.server_connection_id, - serverHost=conn.address[0], - serverPort=conn.address[1], - serviceId=conn.service_id, - speculative_authenticate="speculativeAuthenticate" in orig, - ) - if publish: - assert listeners is not None - assert address is not None - listeners.publish_command_success( - duration, - published_reply, - command_name, - request_id, - address, - conn.server_connection_id, - op_id, - service_id=conn.service_id, - speculative_hello=speculative_hello, - database_name=dbname, - ) + telemetry.succeeded(docs[0], command_name, speculative_hello) if client and client._encrypter and reply and decrypt_reply: decrypted = client._encrypter.decrypt(reply.raw_command_response()) @@ -315,7 +219,7 @@ def _run_command( "list[dict[str, Any]]", _decode_all_selective(decrypted, codec_options, user_fields) ) - return docs, reply, duration + return docs, reply, telemetry.duration def run_bulk_write_command( @@ -341,6 +245,7 @@ def run_bulk_write_command( :param max_doc_size: The largest document size in the batch, passed to ``conn.send_message``. :param unacknowledged: When ``True``, send only and fake an ``{"ok": 1}`` reply. """ + topology_id = client._topology_settings._topology_id if client is not None else None return _run_command( bwc.conn, # type: ignore[arg-type] cmd, @@ -350,8 +255,7 @@ def run_bulk_write_command( client=client, session=bwc.session, # type: ignore[arg-type] listeners=bwc.listeners, - address=bwc.conn.address, # type: ignore[union-attr] - start=bwc.start_time, + topology_id=topology_id, codec_options=bwc.codec, op_id=bwc.op_id, command_name=bwc.name, @@ -372,8 +276,6 @@ def run_cursor_command( client: Optional[MongoClient[Any]], session: Optional[ClientSession], listeners: Optional[_EventListeners], - address: Optional[_Address], - start: datetime.datetime, codec_options: CodecOptions[_DocumentType], command_name: str, user_fields: Optional[Mapping[str, Any]] = None, @@ -393,8 +295,6 @@ def run_cursor_command( :param client: The MongoClient, for ``$clusterTime`` gossip and logging. :param session: The session to update from the response. :param listeners: The event listeners, or ``None`` to disable APM. - :param address: The (host, port) of ``conn`` for APM events. - :param start: The ``datetime`` the operation began, for duration timing. :param codec_options: The CodecOptions used to decode the reply. :param command_name: The command name for APM events. :param user_fields: Response fields decoded with the codec's TypeDecoders. @@ -405,6 +305,7 @@ def run_cursor_command( reply's own ``unpack_response`` is used. :param cursor_id: The cursor id passed to ``unpack_res``. """ + topology_id = client._topology_settings._topology_id if client is not None else None return _run_command( conn, cmd, @@ -414,8 +315,7 @@ def run_cursor_command( client=client, session=session, listeners=listeners, - address=address, - start=start, + topology_id=topology_id, codec_options=codec_options, user_fields=user_fields, command_name=command_name, @@ -438,7 +338,6 @@ def run_command( client: Optional[MongoClient[Any]], check: bool = True, allowable_errors: Optional[Sequence[Union[str, int]]] = None, - address: Optional[_Address] = None, listeners: Optional[_EventListeners] = None, max_bson_size: Optional[int] = None, read_concern: Optional[ReadConcern] = None, @@ -465,7 +364,6 @@ def run_command( :param client: The MongoClient, for ``$clusterTime`` gossip and logging. :param check: Raise OperationFailure if there are errors. :param allowable_errors: Errors to ignore when ``check`` is True. - :param address: The (host, port) of ``conn`` for APM events. :param listeners: The event listeners, or ``None`` to disable APM. :param max_bson_size: The maximum encoded BSON size for this server. :param read_concern: The read concern for this command. @@ -492,9 +390,8 @@ def run_command( if collation is not None: spec["collation"] = collation - publish = listeners is not None and listeners.enabled_for_commands - start = datetime.datetime.now() - if publish: + topology_id = client._topology_settings._topology_id if client is not None else None + if listeners is not None and listeners.enabled_for_commands: speculative_hello = _is_speculative_authenticate(name, spec) if compression_ctx and name.lower() in _NO_COMPRESSION: @@ -529,8 +426,7 @@ def run_command( client=client, session=session, listeners=listeners, - address=address, - start=start, + topology_id=topology_id, codec_options=codec_options, user_fields=user_fields, orig=orig, diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index b3929b674a..1006735444 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -78,6 +78,7 @@ SSLErrors, _CancellationContext, _configured_socket_interface, + _ConnectionTelemetryInfo, _raise_connection_failure, ) from pymongo.read_preferences import ReadPreference @@ -109,7 +110,7 @@ _IS_SYNC = True -class Connection: +class Connection(_ConnectionTelemetryInfo): """Store a connection with some metadata. :param conn: a raw connection object @@ -405,7 +406,6 @@ def command( client, check, allowable_errors, - self.address, listeners, self.max_bson_size, read_concern, diff --git a/pymongo/synchronous/server.py b/pymongo/synchronous/server.py index 7aa017134a..09d8fb75e1 100644 --- a/pymongo/synchronous/server.py +++ b/pymongo/synchronous/server.py @@ -18,7 +18,6 @@ import logging from contextlib import AbstractContextManager -from datetime import datetime from typing import ( TYPE_CHECKING, Any, @@ -155,7 +154,6 @@ def run_operation( :param client: A MongoClient instance. """ assert listeners is not None - start = datetime.now() use_cmd = operation.use_command(conn) more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come) @@ -179,8 +177,6 @@ def run_operation( client=client, session=operation.session, # type: ignore[arg-type] listeners=listeners, - address=conn.address, - start=start, codec_options=operation.codec_options, user_fields=user_fields, command_name=operation.name, From d62b69f1716b3e384f63fc6e5c9241e4a2ea4281 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 24 Jun 2026 13:26:17 -0500 Subject: [PATCH 2/4] PYTHON-5745 Fix copyright year in _telemetry.py --- pymongo/_telemetry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymongo/_telemetry.py b/pymongo/_telemetry.py index 969a0a51f5..38e8ed17f9 100644 --- a/pymongo/_telemetry.py +++ b/pymongo/_telemetry.py @@ -1,4 +1,4 @@ -# Copyright 2015-present MongoDB, Inc. +# Copyright 2026-present MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From a08f32678722e5d04e1215a579ed7084a91ff196 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 24 Jun 2026 13:32:20 -0500 Subject: [PATCH 3/4] PYTHON-5745 Guard ObjectId import with TYPE_CHECKING in pool_shared.py --- pymongo/pool_shared.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pymongo/pool_shared.py b/pymongo/pool_shared.py index 56adaeda9b..410ffd8189 100644 --- a/pymongo/pool_shared.py +++ b/pymongo/pool_shared.py @@ -30,7 +30,6 @@ Union, ) -from bson.objectid import ObjectId from pymongo import _csot from pymongo.asynchronous.helpers import _getaddrinfo from pymongo.errors import ( # type:ignore[attr-defined] @@ -46,6 +45,7 @@ SSLErrors = (PYSSLError, SSLError) if TYPE_CHECKING: + from bson.objectid import ObjectId from pymongo.pyopenssl_context import _sslConn from pymongo.typings import _Address From adfa86359994eeb5e023446debce297523bce29c Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Wed, 24 Jun 2026 13:35:53 -0500 Subject: [PATCH 4/4] PYTHON-5745 Fix speculative_hello computed regardless of APM state --- pymongo/asynchronous/command_runner.py | 4 +--- pymongo/synchronous/command_runner.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/pymongo/asynchronous/command_runner.py b/pymongo/asynchronous/command_runner.py index 7e15dd17e0..a2631bb94b 100644 --- a/pymongo/asynchronous/command_runner.py +++ b/pymongo/asynchronous/command_runner.py @@ -378,7 +378,6 @@ async def run_command( :param write_concern: The write concern for this command. Applied via CSOT. """ name = next(iter(spec)) - speculative_hello = False # Publish the original command document, perhaps with lsid and $clusterTime. orig = spec @@ -391,8 +390,7 @@ async def run_command( spec["collation"] = collation topology_id = client._topology_settings._topology_id if client is not None else None - if listeners is not None and listeners.enabled_for_commands: - speculative_hello = _is_speculative_authenticate(name, spec) + speculative_hello = _is_speculative_authenticate(name, spec) if compression_ctx and name.lower() in _NO_COMPRESSION: compression_ctx = None diff --git a/pymongo/synchronous/command_runner.py b/pymongo/synchronous/command_runner.py index 34a452efb6..fbf8574cad 100644 --- a/pymongo/synchronous/command_runner.py +++ b/pymongo/synchronous/command_runner.py @@ -378,7 +378,6 @@ def run_command( :param write_concern: The write concern for this command. Applied via CSOT. """ name = next(iter(spec)) - speculative_hello = False # Publish the original command document, perhaps with lsid and $clusterTime. orig = spec @@ -391,8 +390,7 @@ def run_command( spec["collation"] = collation topology_id = client._topology_settings._topology_id if client is not None else None - if listeners is not None and listeners.enabled_for_commands: - speculative_hello = _is_speculative_authenticate(name, spec) + speculative_hello = _is_speculative_authenticate(name, spec) if compression_ctx and name.lower() in _NO_COMPRESSION: compression_ctx = None