diff --git a/ddtrace/internal/settings/_telemetry.py b/ddtrace/internal/settings/_telemetry.py index 42f8411305d..f3d7d15be33 100644 --- a/ddtrace/internal/settings/_telemetry.py +++ b/ddtrace/internal/settings/_telemetry.py @@ -23,6 +23,10 @@ class TelemetryConfig(DDConfig): INSTALL_TIME = DDConfig.v(t.Optional[str], "instrumentation.install_time", default=None) FORCE_START = DDConfig.v(bool, "instrumentation_telemetry.tests.force_app_started", default=False, private=True) LOG_COLLECTION_ENABLED = DDConfig.v(bool, "telemetry.log_collection.enabled", default=True) + # Interval should be fixed to 24 hours. The value should only be overridden in tests. + EXTENDED_HEARTBEAT_INTERVAL = DDConfig.v( + float, "telemetry.extended_heartbeat_interval", default=3600 * 24.0, private=True + ) config = TelemetryConfig() diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 76bfe344afa..86fb427ba55 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -156,14 +156,15 @@ def __init__(self, is_periodic: bool = True, agentless: Optional[bool] = None) - self._namespace = MetricNamespace() self._logs: set[dict[str, Any]] = set() self._events_queue: list[dict[str, Any]] = [] + self._queued_configs: list[dict] = [] + self._sent_configs: list[dict] = [] self._configuration_queue: list[dict] = [] self._imported_dependencies: dict[str, str] = dict() self._modules_already_imported: set[str] = set() self._product_enablement: dict[str, bool] = {product.value: False for product in TELEMETRY_APM_PRODUCT} self._previous_product_enablement: dict[str, bool] = {} self._extended_time = time.monotonic() - # The extended heartbeat interval is set to 24 hours - self._extended_heartbeat_interval = 3600 * 24 + self._extended_heartbeat_interval = config.EXTENDED_HEARTBEAT_INTERVAL self.started = False @@ -296,15 +297,22 @@ def _report_app_started(self, register_app_shutdown: bool = True) -> Optional[di } return payload - def _report_heartbeat(self) -> Optional[dict[str, Any]]: - if config.DEPENDENCY_COLLECTION and time.monotonic() - self._extended_time > self._extended_heartbeat_interval: - self._extended_time += self._extended_heartbeat_interval - return { - "dependencies": [ + def _report_heartbeat(self) -> dict[str, Any]: + """Report a heartbeat to keep RC connections alive. + + Extended heartbeats (non-empty payload) include configurations and dependencies; + regular heartbeats return an empty payload. Callers should queue this after + configuration and dependencies events so values are accurately reported. + """ + payload = {} + if time.monotonic() - self._extended_time > self._extended_heartbeat_interval: + payload["configurations"] = self._sent_configs + if config.DEPENDENCY_COLLECTION: + payload["dependencies"] = [ {"name": name, "version": version} for name, version in self._imported_dependencies.items() ] - } - return None + self._extended_time += self._extended_heartbeat_interval + return payload def _report_integrations(self) -> list[dict]: """Flushes and returns a list of all queued integrations""" @@ -316,8 +324,9 @@ def _report_integrations(self) -> list[dict]: def _report_configurations(self) -> list[dict]: """Flushes and returns a list of all queued configurations""" with self._service_lock: - configurations = self._configuration_queue - self._configuration_queue = [] + configurations = self._queued_configs + self._sent_configs.extend(configurations) + self._queued_configs = [] return configurations def _report_dependencies(self) -> Optional[list[dict[str, Any]]]: @@ -386,13 +395,13 @@ def add_configuration( with self._service_lock: config["seq_id"] = next(self._sequence_configurations) - self._configuration_queue.append(config) + self._queued_configs.append(config) def add_configurations(self, configuration_list: list[tuple[str, str, str]]) -> None: """Creates and queues a list of configurations""" with self._service_lock: for name, value, origin in configuration_list: - self._configuration_queue.append( + self._queued_configs.append( { "name": name, "origin": origin, @@ -631,14 +640,10 @@ def periodic(self, force_flush: bool = False, shutting_down: bool = False) -> No if shutting_down and not forksafe.is_fork_child(): events.append(self._get_event({}, TELEMETRY_EVENT_TYPE.SHUTDOWN)) - # Always include a heartbeat to keep RC connections alive - # Extended heartbeat should be queued after app-dependencies-loaded event. This - # ensures that that imported dependencies are accurately reported. if heartbeat_payload := self._report_heartbeat(): - # Extended heartbeat report dependencies while regular heartbeats report empty payloads events.append(self._get_event(heartbeat_payload, TELEMETRY_EVENT_TYPE.EXTENDED_HEARTBEAT)) else: - events.append(self._get_event({}, TELEMETRY_EVENT_TYPE.HEARTBEAT)) + events.append(self._get_event(heartbeat_payload, TELEMETRY_EVENT_TYPE.HEARTBEAT)) # Get any queued events (ie metrics and logs from previous periodic calls) and combine with current batch if queued_events := self._report_events(): @@ -672,7 +677,8 @@ def reset_queues(self) -> None: self._namespace.flush() self._logs = set() self._imported_dependencies = {} - self._configuration_queue = [] + self._queued_configs = [] + self._sent_configs = [] def _report_events(self) -> list[dict]: """Flushes and returns a list of all telemtery event""" diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index 9a439f5a6e3..4d9774817b5 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -381,3 +381,31 @@ def test_telemetry_multiple_sources(test_agent_session, run_python_code_in_subpr assert sorted_configs[3]["value"] is True assert sorted_configs[3]["origin"] == "code" + + +@pytest.mark.parametrize("collect_dependencies", [True, False]) +def test_extended_heartbeat_sent(collect_dependencies, ddtrace_run_python_code_in_subprocess, test_agent_session): + """Assert at least one extended heartbeat is sent when the extended heartbeat interval has elapsed.""" + + env = os.environ.copy() + env["_DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL"] = "1" + env["DD_TELEMETRY_LOG_COLLECTION_ENABLED"] = "0.1" + env["DD_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED"] = str(collect_dependencies) + env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true" + + _, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import time; time.sleep(1.5)", env=env) + assert status == 0, stderr + assert stderr == b"" + + extended_events = test_agent_session.get_events("app-extended-heartbeat") + assert len(extended_events) >= 1 + + assert extended_events[0]["payload"]["configurations"] is not None + configurations = test_agent_session.get_configurations() + assert configurations == extended_events[0]["payload"]["configurations"] + + if collect_dependencies: + assert "dependencies" in extended_events[0]["payload"] + assert len(extended_events[0]["payload"]["dependencies"]) > 0, extended_events[0]["payload"] + else: + assert "dependencies" not in extended_events[0]["payload"]