Skip to content
127 changes: 119 additions & 8 deletions libdd-telemetry/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@ impl TelemetryWorker {
self.deadlines
.schedule_event(LifecycleAction::FlushData)
.unwrap();

#[allow(clippy::unwrap_used)]
self.deadlines
.schedule_event(LifecycleAction::ExtendedHeartbeat)
.unwrap();
Comment thread
khanayan123 marked this conversation as resolved.
self.data.started = true;
}
}
Expand Down Expand Up @@ -494,15 +499,13 @@ impl TelemetryWorker {
Ok(()) => self.payload_sent_success(&extended_hb),
Err(err) => self.log_err(&err),
}
// Only re-schedule self. Resetting `FlushData` here would replace its
// existing deadline with `now + heartbeat_interval`, starving FlushData
// when `extended_heartbeat_interval < heartbeat_interval` because each
// ExtendedHeartbeat firing pushes FlushData out before it can fire.
#[allow(clippy::unwrap_used)]
self.deadlines
.schedule_events(
&mut [
LifecycleAction::FlushData,
LifecycleAction::ExtendedHeartbeat,
]
.into_iter(),
)
.schedule_event(LifecycleAction::ExtendedHeartbeat)
.unwrap();
}
Lifecycle(Stop) => {
Expand Down Expand Up @@ -1283,7 +1286,10 @@ mod tests {
use crate::worker::http_client::header::{
DD_PARENT_SESSION_ID, DD_ROOT_SESSION_ID, DD_SESSION_ID,
};
use crate::worker::{TelemetryWorker, TelemetryWorkerBuilder, TelemetryWorkerHandle};
use crate::worker::{
LifecycleAction, TelemetryActions, TelemetryWorker, TelemetryWorkerBuilder,
TelemetryWorkerFlavor, TelemetryWorkerHandle,
};
use libdd_common::{http_common, Endpoint};
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -1436,6 +1442,111 @@ mod tests {
);
}

fn build_test_worker_with_flavor(flavor: TelemetryWorkerFlavor) -> TelemetryWorker {
let mut b = TelemetryWorkerBuilder::new(
"h".into(),
"svc".into(),
"lang".into(),
"1".into(),
"tv".into(),
);
b.config
.set_endpoint(Endpoint::from_slice("http://127.0.0.1:1"))
.unwrap();
b.runtime_id = Some("rid".into());
b.flavor = flavor;
b.build_worker(Some(tokio::runtime::Handle::current())).1
}

/// Every event with a delay must be scheduled on Start; otherwise it sits in
/// `delays` forever and its handler never fires. Walking `delays` (rather than
/// enumerating variants) guards against future periodic actions regressing.
#[tokio::test]
#[cfg_attr(miri, ignore)] // reqwest in dispatch_action
async fn full_flavor_start_schedules_every_periodic_action() {
let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::Full);

let _ = worker
.dispatch_action(TelemetryActions::Lifecycle(LifecycleAction::Start))
.await;

let delays: Vec<LifecycleAction> =
worker.deadlines.delays.iter().map(|(_, k)| *k).collect();
let scheduled: Vec<LifecycleAction> =
worker.deadlines.deadlines.iter().map(|(_, k)| *k).collect();

assert!(!delays.is_empty(), "scheduler should have periodic actions");
for ev in &delays {
assert!(
scheduled.contains(ev),
"{ev:?} has a delay but was not scheduled on Start; scheduled={scheduled:?}",
);
}
}

/// `MetricsLogs` flavor intentionally excludes lifecycle events. Negative guard
/// so any future change emitting them from this flavor has to update the test.
#[tokio::test]
#[cfg_attr(miri, ignore)] // reqwest in build_worker
async fn metrics_logs_flavor_start_does_not_schedule_extended_heartbeat() {
let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::MetricsLogs);

let _ = worker
.dispatch_metrics_logs_action(TelemetryActions::Lifecycle(LifecycleAction::Start))
.await;

let scheduled: Vec<LifecycleAction> =
worker.deadlines.deadlines.iter().map(|(_, k)| *k).collect();

assert!(scheduled.contains(&LifecycleAction::FlushMetricAggr));
assert!(scheduled.contains(&LifecycleAction::FlushData));
assert!(
!scheduled.contains(&LifecycleAction::ExtendedHeartbeat),
"MetricsLogs should not schedule ExtendedHeartbeat; scheduled={scheduled:?}",
);
}

/// Regression: when `extended_heartbeat_interval < heartbeat_interval`, the
/// ExtendedHeartbeat handler must not reset FlushData's deadline. If it did, each
/// firing would push FlushData to `now + heartbeat_interval` and the next
/// (sooner) ExtendedHeartbeat would push it again — starving FlushData forever.
#[tokio::test]
#[cfg_attr(miri, ignore)] // reqwest in dispatch_action
async fn extended_heartbeat_does_not_reset_flush_data() {
let mut worker = build_test_worker_with_flavor(TelemetryWorkerFlavor::Full);

let _ = worker
.dispatch_action(TelemetryActions::Lifecycle(LifecycleAction::Start))
.await;

let flush_data_before = worker
.deadlines
.deadlines
.iter()
.find(|(_, k)| *k == LifecycleAction::FlushData)
.map(|(d, _)| *d)
.expect("FlushData scheduled on Start");

let _ = worker
.dispatch_action(TelemetryActions::Lifecycle(
LifecycleAction::ExtendedHeartbeat,
))
.await;

let flush_data_after = worker
.deadlines
.deadlines
.iter()
.find(|(_, k)| *k == LifecycleAction::FlushData)
.map(|(d, _)| *d)
.expect("FlushData should still be scheduled after ExtendedHeartbeat fires");

assert_eq!(
flush_data_before, flush_data_after,
"ExtendedHeartbeat must not reset FlushData's deadline",
);
}

mod reset {
use super::super::*;
use crate::data::{
Expand Down
14 changes: 3 additions & 11 deletions libdd-telemetry/src/worker/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ impl<T: Clone + Eq> Scheduler<T> {
Some((*i, key))
}

pub fn schedule_events(&mut self, events: &mut impl Iterator<Item = T>) -> Result<(), T> {
let now = self.now.now();
for ev in events {
self.schedule_event_with_from(ev, now)?;
}
Ok(())
}

fn schedule_event_with_from(&mut self, event: T, from: Instant) -> Result<(), T> {
let (delay, _) = match self.delays.iter().find(|(_, k)| k == &event) {
Some(s) => s,
Expand Down Expand Up @@ -108,9 +100,9 @@ mod tests {
(Duration::from_millis(40), 2),
]);
scheduler.now = Now::Mock(start);
scheduler
.schedule_events(&mut [0, 1, 2].into_iter())
.unwrap();
scheduler.schedule_event(0).unwrap();
scheduler.schedule_event(1).unwrap();
scheduler.schedule_event(2).unwrap();

scheduler.now = Now::Mock(start + Duration::from_millis(9));
expect_scheduled(
Expand Down
Loading