From 2f670ad01e35b15cf1e42bed7f81a20788c51fac Mon Sep 17 00:00:00 2001 From: Munir Date: Mon, 23 Feb 2026 15:06:32 +0100 Subject: [PATCH 1/6] chore(telemetry): add configurations to extended heartbeats --- ddtrace/internal/settings/_telemetry.py | 2 ++ ddtrace/internal/telemetry/writer.py | 45 ++++++++++++++----------- tests/telemetry/test_telemetry.py | 28 +++++++++++++++ 3 files changed, 55 insertions(+), 20 deletions(-) diff --git a/ddtrace/internal/settings/_telemetry.py b/ddtrace/internal/settings/_telemetry.py index 42f8411305d..6942f1ebc1f 100644 --- a/ddtrace/internal/settings/_telemetry.py +++ b/ddtrace/internal/settings/_telemetry.py @@ -23,6 +23,8 @@ class TelemetryConfig(DDConfig): INSTALL_TIME = DDConfig.v(t.Optional[str], "instrumentation.install_time", default=None) FORCE_START = DDConfig.v(bool, "instrumentation_telemetry.tests.force_app_started", default=False, private=True) LOG_COLLECTION_ENABLED = DDConfig.v(bool, "telemetry.log_collection.enabled", default=True) + # Interval should be fixed to 24 hours. The value is should not be overridden in tests. + EXTENDED_HEARTBEAT_INTERVAL = DDConfig.v(float, "telemetry.extended_heartbeat_interval", default=3600 * 24.0) config = TelemetryConfig() diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 30def852f28..e3e20d9b349 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -161,14 +161,14 @@ def __init__(self, is_periodic: bool = True, agentless: Optional[bool] = None) - self._logs: Set[Dict[str, Any]] = set() self._forked: bool = False self._events_queue: List[Dict[str, Any]] = [] - self._configuration_queue: List[Dict] = [] + self._queued_configs: List[Dict] = [] + self._sent_configs: List[Dict] = [] self._imported_dependencies: Dict[str, str] = dict() self._modules_already_imported: Set[str] = set() self._product_enablement: Dict[str, bool] = {product.value: False for product in TELEMETRY_APM_PRODUCT} self._previous_product_enablement: Dict[str, bool] = {} self._extended_time = time.monotonic() - # The extended heartbeat interval is set to 24 hours - self._extended_heartbeat_interval = 3600 * 24 + self._extended_heartbeat_interval = config.EXTENDED_HEARTBEAT_INTERVAL self.started = False @@ -301,15 +301,22 @@ def _report_app_started(self, register_app_shutdown: bool = True) -> Optional[Di } return payload - def _report_heartbeat(self) -> Optional[Dict[str, Any]]: - if config.DEPENDENCY_COLLECTION and time.monotonic() - self._extended_time > self._extended_heartbeat_interval: - self._extended_time += self._extended_heartbeat_interval - return { - "dependencies": [ + def _report_heartbeat(self) -> Dict[str, Any]: + """Report a heartbeat to keep RC connections alive. + + Extended heartbeats (non-empty payload) include configurations and dependencies; + regular heartbeats return an empty payload. Callers should queue this after + configuration and dependencies events so values are accurately reported. + """ + payload = {} + if time.monotonic() - self._extended_time > self._extended_heartbeat_interval: + payload["configurations"] = self._sent_configs + if config.DEPENDENCY_COLLECTION: + payload["dependencies"] = [ {"name": name, "version": version} for name, version in self._imported_dependencies.items() ] - } - return None + self._extended_time += self._extended_heartbeat_interval + return payload def _report_integrations(self) -> List[Dict]: """Flushes and returns a list of all queued integrations""" @@ -321,8 +328,9 @@ def _report_integrations(self) -> List[Dict]: def _report_configurations(self) -> List[Dict]: """Flushes and returns a list of all queued configurations""" with self._service_lock: - configurations = self._configuration_queue - self._configuration_queue = [] + configurations = self._queued_configs + self._sent_configs.extend(configurations) + self._queued_configs = [] return configurations def _report_dependencies(self) -> Optional[List[Dict[str, Any]]]: @@ -391,13 +399,13 @@ def add_configuration( with self._service_lock: config["seq_id"] = next(self._sequence_configurations) - self._configuration_queue.append(config) + self._queued_configs.append(config) def add_configurations(self, configuration_list: List[Tuple[str, str, str]]) -> None: """Creates and queues a list of configurations""" with self._service_lock: for name, value, origin in configuration_list: - self._configuration_queue.append( + self._queued_configs.append( { "name": name, "origin": origin, @@ -636,14 +644,10 @@ def periodic(self, force_flush: bool = False, shutting_down: bool = False) -> No if shutting_down and not self._forked: events.append(self._get_event({}, TELEMETRY_EVENT_TYPE.SHUTDOWN)) - # Always include a heartbeat to keep RC connections alive - # Extended heartbeat should be queued after app-dependencies-loaded event. This - # ensures that that imported dependencies are accurately reported. if heartbeat_payload := self._report_heartbeat(): - # Extended heartbeat report dependencies while regular heartbeats report empty payloads events.append(self._get_event(heartbeat_payload, TELEMETRY_EVENT_TYPE.EXTENDED_HEARTBEAT)) else: - events.append(self._get_event({}, TELEMETRY_EVENT_TYPE.HEARTBEAT)) + events.append(self._get_event(heartbeat_payload, TELEMETRY_EVENT_TYPE.HEARTBEAT)) # Get any queued events (ie metrics and logs from previous periodic calls) and combine with current batch if queued_events := self._report_events(): @@ -677,7 +681,8 @@ def reset_queues(self) -> None: self._namespace.flush() self._logs = set() self._imported_dependencies = {} - self._configuration_queue = [] + self._queued_configs = [] + self._sent_configs = [] def _report_events(self) -> List[Dict]: """Flushes and returns a list of all telemtery event""" diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index 9a439f5a6e3..b76fcca1a0c 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -381,3 +381,31 @@ def test_telemetry_multiple_sources(test_agent_session, run_python_code_in_subpr assert sorted_configs[3]["value"] is True assert sorted_configs[3]["origin"] == "code" + + +@pytest.mark.parametrize("collect_dependencies", [True, False]) +def test_extended_heartbeat_sent(collect_dependencies, ddtrace_run_python_code_in_subprocess, test_agent_session): + """Assert at least one extended heartbeat is sent after sleeping 1.5 seconds.""" + + env = os.environ.copy() + env["DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL"] = "1" + env["DD_TELEMETRY_LOG_COLLECTION_ENABLED"] = "0.1" + env["DD_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED"] = str(collect_dependencies) + env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true" + + _, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import time; time.sleep(1.5)", env=env) + assert status == 0, stderr + assert stderr == b"" + + extended_events = test_agent_session.get_events("app-extended-heartbeat") + assert len(extended_events) >= 1 + + assert extended_events[0]["payload"]["configurations"] is not None + configurations = test_agent_session.get_configurations() + assert configurations == extended_events[0]["payload"]["configurations"] + + if collect_dependencies: + assert "dependencies" in extended_events[0]["payload"] + assert len(extended_events[0]["payload"]["dependencies"]) > 0, extended_events[0]["payload"] + else: + assert "dependencies" not in extended_events[0]["payload"] From f85e7a6655f1fff30c0f45eb7610cec75938a457 Mon Sep 17 00:00:00 2001 From: Munir Date: Mon, 23 Feb 2026 15:14:20 +0100 Subject: [PATCH 2/6] fix comment --- ddtrace/internal/settings/_telemetry.py | 2 +- tests/telemetry/test_telemetry.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ddtrace/internal/settings/_telemetry.py b/ddtrace/internal/settings/_telemetry.py index 6942f1ebc1f..21441ad3d52 100644 --- a/ddtrace/internal/settings/_telemetry.py +++ b/ddtrace/internal/settings/_telemetry.py @@ -24,7 +24,7 @@ class TelemetryConfig(DDConfig): FORCE_START = DDConfig.v(bool, "instrumentation_telemetry.tests.force_app_started", default=False, private=True) LOG_COLLECTION_ENABLED = DDConfig.v(bool, "telemetry.log_collection.enabled", default=True) # Interval should be fixed to 24 hours. The value is should not be overridden in tests. - EXTENDED_HEARTBEAT_INTERVAL = DDConfig.v(float, "telemetry.extended_heartbeat_interval", default=3600 * 24.0) + EXTENDED_HEARTBEAT_INTERVAL = DDConfig.v(float, "telemetry.extended_heartbeat_interval", default=3600 * 24.0, private=True) config = TelemetryConfig() diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index b76fcca1a0c..4d9774817b5 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -385,10 +385,10 @@ def test_telemetry_multiple_sources(test_agent_session, run_python_code_in_subpr @pytest.mark.parametrize("collect_dependencies", [True, False]) def test_extended_heartbeat_sent(collect_dependencies, ddtrace_run_python_code_in_subprocess, test_agent_session): - """Assert at least one extended heartbeat is sent after sleeping 1.5 seconds.""" + """Assert at least one extended heartbeat is sent when the extended heartbeat interval has elapsed.""" env = os.environ.copy() - env["DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL"] = "1" + env["_DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL"] = "1" env["DD_TELEMETRY_LOG_COLLECTION_ENABLED"] = "0.1" env["DD_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED"] = str(collect_dependencies) env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true" From c931c0231784a1cd6bc48957af080ee7b34ffb30 Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Mon, 23 Feb 2026 15:46:44 +0100 Subject: [PATCH 3/6] Apply suggestions from code review --- ddtrace/internal/settings/_telemetry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/internal/settings/_telemetry.py b/ddtrace/internal/settings/_telemetry.py index 21441ad3d52..bee8c16e20f 100644 --- a/ddtrace/internal/settings/_telemetry.py +++ b/ddtrace/internal/settings/_telemetry.py @@ -23,7 +23,7 @@ class TelemetryConfig(DDConfig): INSTALL_TIME = DDConfig.v(t.Optional[str], "instrumentation.install_time", default=None) FORCE_START = DDConfig.v(bool, "instrumentation_telemetry.tests.force_app_started", default=False, private=True) LOG_COLLECTION_ENABLED = DDConfig.v(bool, "telemetry.log_collection.enabled", default=True) - # Interval should be fixed to 24 hours. The value is should not be overridden in tests. + # Interval should be fixed to 24 hours. The value is should only be overridden in tests. EXTENDED_HEARTBEAT_INTERVAL = DDConfig.v(float, "telemetry.extended_heartbeat_interval", default=3600 * 24.0, private=True) From de4c3174f3bb89b88a8f3d50e4e476cb8fb9f8df Mon Sep 17 00:00:00 2001 From: Munir Date: Mon, 23 Feb 2026 17:13:33 +0100 Subject: [PATCH 4/6] fmt --- ddtrace/internal/settings/_telemetry.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ddtrace/internal/settings/_telemetry.py b/ddtrace/internal/settings/_telemetry.py index bee8c16e20f..c08802818dd 100644 --- a/ddtrace/internal/settings/_telemetry.py +++ b/ddtrace/internal/settings/_telemetry.py @@ -24,7 +24,9 @@ class TelemetryConfig(DDConfig): FORCE_START = DDConfig.v(bool, "instrumentation_telemetry.tests.force_app_started", default=False, private=True) LOG_COLLECTION_ENABLED = DDConfig.v(bool, "telemetry.log_collection.enabled", default=True) # Interval should be fixed to 24 hours. The value is should only be overridden in tests. - EXTENDED_HEARTBEAT_INTERVAL = DDConfig.v(float, "telemetry.extended_heartbeat_interval", default=3600 * 24.0, private=True) + EXTENDED_HEARTBEAT_INTERVAL = DDConfig.v( + float, "telemetry.extended_heartbeat_interval", default=3600 * 24.0, private=True + ) config = TelemetryConfig() From 2b3f8100529bc1ee167fbae52c6c3a9b3570fef6 Mon Sep 17 00:00:00 2001 From: Munir Date: Tue, 24 Feb 2026 10:20:16 +0100 Subject: [PATCH 5/6] lint --- ddtrace/internal/telemetry/writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 6d120c01529..86fb427ba55 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -297,7 +297,7 @@ def _report_app_started(self, register_app_shutdown: bool = True) -> Optional[di } return payload - def _report_heartbeat(self) -> Optional[dict[str, Any]]: + def _report_heartbeat(self) -> dict[str, Any]: """Report a heartbeat to keep RC connections alive. Extended heartbeats (non-empty payload) include configurations and dependencies; From 190aef798c542d68b81619923aa623baa537e7f9 Mon Sep 17 00:00:00 2001 From: Munir Abdinur Date: Fri, 27 Feb 2026 02:05:42 +0100 Subject: [PATCH 6/6] Update ddtrace/internal/settings/_telemetry.py Co-authored-by: Rachel Yang --- ddtrace/internal/settings/_telemetry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/internal/settings/_telemetry.py b/ddtrace/internal/settings/_telemetry.py index c08802818dd..f3d7d15be33 100644 --- a/ddtrace/internal/settings/_telemetry.py +++ b/ddtrace/internal/settings/_telemetry.py @@ -23,7 +23,7 @@ class TelemetryConfig(DDConfig): INSTALL_TIME = DDConfig.v(t.Optional[str], "instrumentation.install_time", default=None) FORCE_START = DDConfig.v(bool, "instrumentation_telemetry.tests.force_app_started", default=False, private=True) LOG_COLLECTION_ENABLED = DDConfig.v(bool, "telemetry.log_collection.enabled", default=True) - # Interval should be fixed to 24 hours. The value is should only be overridden in tests. + # Interval should be fixed to 24 hours. The value should only be overridden in tests. EXTENDED_HEARTBEAT_INTERVAL = DDConfig.v( float, "telemetry.extended_heartbeat_interval", default=3600 * 24.0, private=True )