Skip to content
4 changes: 4 additions & 0 deletions ddtrace/internal/settings/_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class TelemetryConfig(DDConfig):
INSTALL_TIME = DDConfig.v(t.Optional[str], "instrumentation.install_time", default=None)
FORCE_START = DDConfig.v(bool, "instrumentation_telemetry.tests.force_app_started", default=False, private=True)
LOG_COLLECTION_ENABLED = DDConfig.v(bool, "telemetry.log_collection.enabled", default=True)
# Interval should be fixed to 24 hours. The value should only be overridden in tests.
EXTENDED_HEARTBEAT_INTERVAL = DDConfig.v(
float, "telemetry.extended_heartbeat_interval", default=3600 * 24.0, private=True
)


config = TelemetryConfig()
44 changes: 25 additions & 19 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,15 @@ def __init__(self, is_periodic: bool = True, agentless: Optional[bool] = None) -
self._namespace = MetricNamespace()
self._logs: set[dict[str, Any]] = set()
self._events_queue: list[dict[str, Any]] = []
self._queued_configs: list[dict] = []
self._sent_configs: list[dict] = []
self._configuration_queue: list[dict] = []
self._imported_dependencies: dict[str, str] = dict()
self._modules_already_imported: set[str] = set()
self._product_enablement: dict[str, bool] = {product.value: False for product in TELEMETRY_APM_PRODUCT}
self._previous_product_enablement: dict[str, bool] = {}
self._extended_time = time.monotonic()
# The extended heartbeat interval is set to 24 hours
self._extended_heartbeat_interval = 3600 * 24
self._extended_heartbeat_interval = config.EXTENDED_HEARTBEAT_INTERVAL

self.started = False

Expand Down Expand Up @@ -296,15 +297,22 @@ def _report_app_started(self, register_app_shutdown: bool = True) -> Optional[di
}
return payload

def _report_heartbeat(self) -> Optional[dict[str, Any]]:
if config.DEPENDENCY_COLLECTION and time.monotonic() - self._extended_time > self._extended_heartbeat_interval:
self._extended_time += self._extended_heartbeat_interval
return {
"dependencies": [
def _report_heartbeat(self) -> dict[str, Any]:
"""Report a heartbeat to keep RC connections alive.

Extended heartbeats (non-empty payload) include configurations and dependencies;
regular heartbeats return an empty payload. Callers should queue this after
configuration and dependencies events so values are accurately reported.
"""
payload = {}
if time.monotonic() - self._extended_time > self._extended_heartbeat_interval:
payload["configurations"] = self._sent_configs
if config.DEPENDENCY_COLLECTION:
payload["dependencies"] = [
{"name": name, "version": version} for name, version in self._imported_dependencies.items()
]
}
return None
self._extended_time += self._extended_heartbeat_interval
return payload

def _report_integrations(self) -> list[dict]:
"""Flushes and returns a list of all queued integrations"""
Expand All @@ -316,8 +324,9 @@ def _report_integrations(self) -> list[dict]:
def _report_configurations(self) -> list[dict]:
"""Flushes and returns a list of all queued configurations"""
with self._service_lock:
configurations = self._configuration_queue
self._configuration_queue = []
configurations = self._queued_configs
self._sent_configs.extend(configurations)
self._queued_configs = []
return configurations

def _report_dependencies(self) -> Optional[list[dict[str, Any]]]:
Expand Down Expand Up @@ -386,13 +395,13 @@ def add_configuration(

with self._service_lock:
config["seq_id"] = next(self._sequence_configurations)
self._configuration_queue.append(config)
self._queued_configs.append(config)

def add_configurations(self, configuration_list: list[tuple[str, str, str]]) -> None:
"""Creates and queues a list of configurations"""
with self._service_lock:
for name, value, origin in configuration_list:
self._configuration_queue.append(
self._queued_configs.append(
{
"name": name,
"origin": origin,
Expand Down Expand Up @@ -631,14 +640,10 @@ def periodic(self, force_flush: bool = False, shutting_down: bool = False) -> No
if shutting_down and not forksafe.is_fork_child():
events.append(self._get_event({}, TELEMETRY_EVENT_TYPE.SHUTDOWN))

# Always include a heartbeat to keep RC connections alive
# Extended heartbeat should be queued after app-dependencies-loaded event. This
# ensures that that imported dependencies are accurately reported.
if heartbeat_payload := self._report_heartbeat():
# Extended heartbeat report dependencies while regular heartbeats report empty payloads
events.append(self._get_event(heartbeat_payload, TELEMETRY_EVENT_TYPE.EXTENDED_HEARTBEAT))
else:
events.append(self._get_event({}, TELEMETRY_EVENT_TYPE.HEARTBEAT))
events.append(self._get_event(heartbeat_payload, TELEMETRY_EVENT_TYPE.HEARTBEAT))

# Get any queued events (ie metrics and logs from previous periodic calls) and combine with current batch
if queued_events := self._report_events():
Expand Down Expand Up @@ -672,7 +677,8 @@ def reset_queues(self) -> None:
self._namespace.flush()
self._logs = set()
self._imported_dependencies = {}
self._configuration_queue = []
self._queued_configs = []
self._sent_configs = []

def _report_events(self) -> list[dict]:
"""Flushes and returns a list of all telemtery event"""
Expand Down
28 changes: 28 additions & 0 deletions tests/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,31 @@ def test_telemetry_multiple_sources(test_agent_session, run_python_code_in_subpr

assert sorted_configs[3]["value"] is True
assert sorted_configs[3]["origin"] == "code"


@pytest.mark.parametrize("collect_dependencies", [True, False])
def test_extended_heartbeat_sent(collect_dependencies, ddtrace_run_python_code_in_subprocess, test_agent_session):
"""Assert at least one extended heartbeat is sent when the extended heartbeat interval has elapsed."""

env = os.environ.copy()
env["_DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL"] = "1"
env["DD_TELEMETRY_LOG_COLLECTION_ENABLED"] = "0.1"
env["DD_TELEMETRY_DEPENDENCY_COLLECTION_ENABLED"] = str(collect_dependencies)
env["_DD_INSTRUMENTATION_TELEMETRY_TESTS_FORCE_APP_STARTED"] = "true"

_, stderr, status, _ = ddtrace_run_python_code_in_subprocess("import time; time.sleep(1.5)", env=env)
assert status == 0, stderr
assert stderr == b""

extended_events = test_agent_session.get_events("app-extended-heartbeat")
assert len(extended_events) >= 1

assert extended_events[0]["payload"]["configurations"] is not None
configurations = test_agent_session.get_configurations()
assert configurations == extended_events[0]["payload"]["configurations"]

if collect_dependencies:
assert "dependencies" in extended_events[0]["payload"]
assert len(extended_events[0]["payload"]["dependencies"]) > 0, extended_events[0]["payload"]
else:
assert "dependencies" not in extended_events[0]["payload"]