diff --git a/changelog.d/13467_kubernetes_logs_uid_keyed_pod_store.fix.md b/changelog.d/13467_kubernetes_logs_uid_keyed_pod_store.fix.md new file mode 100644 index 0000000000000..b60e5f477a0ea --- /dev/null +++ b/changelog.d/13467_kubernetes_logs_uid_keyed_pod_store.fix.md @@ -0,0 +1,3 @@ +The `kubernetes_logs` source now tracks pod metadata by pod UID rather than by name and namespace. Previously, when a pod was deleted and another was created reusing the same name and namespace (for example during a `StatefulSet` rollout or a same-node restart), the recreated pod could be evicted from Vector's internal metadata, causing Vector to stop collecting that pod's logs entirely and emit `Failed to annotate event with pod metadata` errors. + +Both incarnations are now tracked independently while their log files are still being read, so logs from the old pod are annotated with the correct metadata and the new pod is never evicted. diff --git a/src/kubernetes/mod.rs b/src/kubernetes/mod.rs index 834d616a951e6..9cac7bd45a458 100644 --- a/src/kubernetes/mod.rs +++ b/src/kubernetes/mod.rs @@ -9,6 +9,8 @@ pub mod meta_cache; pub mod pod_manager_logic; +pub mod pod_store; pub mod reflector; +pub use pod_store::{PodStore, pod_reflector}; pub use reflector::custom_reflector; diff --git a/src/kubernetes/pod_store.rs b/src/kubernetes/pod_store.rs new file mode 100644 index 0000000000000..a6d44c4313f0e --- /dev/null +++ b/src/kubernetes/pod_store.rs @@ -0,0 +1,278 @@ +//! A UID-keyed store of pod metadata for the `kubernetes_logs` source. +//! +//! kube's reflector `Store` is keyed by name and namespace and therefore +//! mirrors only the *current* state of the cluster: when a pod is deleted and +//! another is created reusing the same name and namespace, the store can only +//! hold the newer one. Vector, however, tails log files that outlive their pod +//! (the kubelet retains them after deletion), and those files are identified on +//! disk by the pod UID. To annotate such files with the metadata of the exact +//! incarnation that produced them, we keep our own store keyed by the +//! identifier that appears in the log path. +//! +//! See . + +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, RwLock}, + time::Duration, +}; + +use futures::StreamExt; +use futures_util::Stream; +use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta}; +use kube::runtime::watcher; +use tokio::pin; +use tokio_util::time::DelayQueue; + +use super::pod_manager_logic::extract_static_pod_config_hashsum; + +/// Returns the identifier used to locate a pod's logs on disk: the static pod +/// config hashsum for mirror pods, otherwise the pod UID. +/// +/// This is the same value used as the `uid` component of the pod log directory +/// (see `build_pod_logs_directory`), so files parsed from disk can be looked up +/// against a [`PodStore`]. +pub fn pod_uid_for_path(metadata: &ObjectMeta) -> Option { + extract_static_pod_config_hashsum(metadata) + .or(metadata.uid.as_deref()) + .map(ToOwned::to_owned) +} + +/// A cloneable handle to a UID-keyed store of pods. +#[derive(Clone, Default)] +pub struct PodStore { + pods: Arc>>>, +} + +impl PodStore { + /// Creates an empty store. + pub fn new() -> Self { + Self::default() + } + + /// Returns the pod tracked under the given path identifier, if any. + pub fn get(&self, uid: &str) -> Option> { + self.pods.read().unwrap().get(uid).cloned() + } + + /// Returns all currently tracked pods. + pub fn list(&self) -> Vec> { + self.pods.read().unwrap().values().cloned().collect() + } + + fn insert(&self, pod: Pod) { + if let Some(uid) = pod_uid_for_path(&pod.metadata) { + self.pods.write().unwrap().insert(uid, Arc::new(pod)); + } + } + + fn remove(&self, uid: &str) { + self.pods.write().unwrap().remove(uid); + } + + fn uids(&self) -> HashSet { + self.pods.read().unwrap().keys().cloned().collect() + } +} + +/// Maintains a [`PodStore`] from a watcher stream, delaying deletions by +/// `delay_deletion` so that logs from a deleted pod can still be annotated +/// while its files are being drained. +/// +/// Because the store is keyed by the pod's path identifier (UID), a pod that is +/// deleted and recreated reusing the same name and namespace occupies a +/// distinct key. The delayed deletion of the old incarnation therefore can +/// never evict the new one. See +/// . +pub async fn pod_reflector(store: PodStore, stream: W, delay_deletion: Duration) +where + W: Stream>>, +{ + pin!(stream); + let mut delay_queue: DelayQueue = DelayQueue::default(); + // UIDs currently believed to be alive. Used to avoid removing a pod whose + // deletion was scheduled but which was re-applied before the delay elapsed. + let mut live: HashSet = HashSet::new(); + let mut init_buffer: Vec = Vec::new(); + loop { + tokio::select! { + result = stream.next() => { + match result { + Some(Ok(event)) => match event { + // Immediately reconcile `Apply` events. + watcher::Event::Apply(pod) => { + if let Some(uid) = pod_uid_for_path(&pod.metadata) { + live.insert(uid); + } + store.insert(pod); + } + // Delay reconciling `Delete` events. + watcher::Event::Delete(pod) => { + if let Some(uid) = pod_uid_for_path(&pod.metadata) { + live.remove(&uid); + delay_queue.insert(uid, delay_deletion); + } + } + // Begin buffering a relist. + watcher::Event::Init => { + init_buffer.clear(); + } + watcher::Event::InitApply(pod) => { + init_buffer.push(pod); + } + // Reconcile the relist: apply everything observed, and + // delay the deletion of pods that are no longer present. + watcher::Event::InitDone => { + let new_uids: HashSet = init_buffer + .iter() + .filter_map(|pod| pod_uid_for_path(&pod.metadata)) + .collect(); + for uid in store.uids() { + if !new_uids.contains(&uid) && live.remove(&uid) { + delay_queue.insert(uid, delay_deletion); + } + } + for pod in init_buffer.drain(..) { + if let Some(uid) = pod_uid_for_path(&pod.metadata) { + live.insert(uid); + } + store.insert(pod); + } + } + }, + Some(Err(error)) => { + warn!(message = "Watcher stream received an error. Retrying.", ?error); + } + // The watcher stream should never yield `None`. + None => unreachable!("a watcher Stream never ends"), + } + } + result = delay_queue.next(), if !delay_queue.is_empty() => { + match result { + Some(expired) => { + let uid = expired.into_inner(); + // Skip removal if the pod was re-applied during the delay. + if !live.contains(&uid) { + store.remove(&uid); + } + } + // DelayQueue returns None only when exhausted, but the + // branch is disabled while the queue is empty. + None => unreachable!("an empty DelayQueue is never polled"), + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::{collections::BTreeMap, time::Duration}; + + use futures::channel::mpsc; + use futures_util::SinkExt; + use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta}; + use kube::runtime::watcher; + + use super::{PodStore, pod_reflector, pod_uid_for_path}; + + fn pod(name: &str, uid: &str) -> Pod { + Pod { + metadata: ObjectMeta { + name: Some(name.to_string()), + namespace: Some("ns".to_string()), + uid: Some(uid.to_string()), + ..ObjectMeta::default() + }, + ..Pod::default() + } + } + + #[test] + fn pod_uid_for_path_prefers_static_hashsum() { + let mut p = pod("foo", "real-uid"); + assert_eq!(pod_uid_for_path(&p.metadata).as_deref(), Some("real-uid")); + + let mut annotations = BTreeMap::new(); + annotations.insert( + "kubernetes.io/config.mirror".to_string(), + "hash123".to_string(), + ); + p.metadata.annotations = Some(annotations); + assert_eq!(pod_uid_for_path(&p.metadata).as_deref(), Some("hash123")); + } + + #[tokio::test] + async fn recreated_pod_keeps_both_incarnations_during_delay() { + // A pod is deleted and recreated reusing the same name/namespace with a + // new UID. Both incarnations must be addressable during the delay so + // that the old incarnation's still-draining files annotate correctly, + // and the new incarnation is never evicted. + let store = PodStore::new(); + let (mut tx, rx) = mpsc::channel(5); + tx.send(Ok(watcher::Event::Apply(pod("foo-0", "uid-a")))) + .await + .unwrap(); + tx.send(Ok(watcher::Event::Delete(pod("foo-0", "uid-a")))) + .await + .unwrap(); + tx.send(Ok(watcher::Event::Apply(pod("foo-0", "uid-b")))) + .await + .unwrap(); + tokio::spawn(pod_reflector(store.clone(), rx, Duration::from_secs(2))); + + tokio::time::sleep(Duration::from_secs(1)).await; + assert!(store.get("uid-a").is_some()); + assert!(store.get("uid-b").is_some()); + + tokio::time::sleep(Duration::from_secs(5)).await; + assert!(store.get("uid-a").is_none()); + assert!(store.get("uid-b").is_some()); + } + + #[tokio::test] + async fn reapplied_pod_survives_pending_deletion() { + let store = PodStore::new(); + let (mut tx, rx) = mpsc::channel(5); + tx.send(Ok(watcher::Event::Apply(pod("foo", "uid-a")))) + .await + .unwrap(); + tx.send(Ok(watcher::Event::Delete(pod("foo", "uid-a")))) + .await + .unwrap(); + tx.send(Ok(watcher::Event::Apply(pod("foo", "uid-a")))) + .await + .unwrap(); + tokio::spawn(pod_reflector(store.clone(), rx, Duration::from_secs(2))); + + tokio::time::sleep(Duration::from_secs(5)).await; + assert!(store.get("uid-a").is_some()); + } + + #[tokio::test] + async fn relist_retains_absent_pod_during_delay() { + let store = PodStore::new(); + let (mut tx, rx) = mpsc::channel(8); + tx.send(Ok(watcher::Event::Apply(pod("a", "uid-a")))) + .await + .unwrap(); + tx.send(Ok(watcher::Event::Apply(pod("b", "uid-b")))) + .await + .unwrap(); + // A relist that no longer reports pod `a`. + tx.send(Ok(watcher::Event::Init)).await.unwrap(); + tx.send(Ok(watcher::Event::InitApply(pod("b", "uid-b")))) + .await + .unwrap(); + tx.send(Ok(watcher::Event::InitDone)).await.unwrap(); + tokio::spawn(pod_reflector(store.clone(), rx, Duration::from_secs(2))); + + tokio::time::sleep(Duration::from_secs(1)).await; + assert!(store.get("uid-a").is_some()); + assert!(store.get("uid-b").is_some()); + + tokio::time::sleep(Duration::from_secs(5)).await; + assert!(store.get("uid-a").is_none()); + assert!(store.get("uid-b").is_some()); + } +} diff --git a/src/sources/kubernetes_logs/k8s_paths_provider.rs b/src/sources/kubernetes_logs/k8s_paths_provider.rs index 8b127b344112d..8f925a051001a 100644 --- a/src/sources/kubernetes_logs/k8s_paths_provider.rs +++ b/src/sources/kubernetes_logs/k8s_paths_provider.rs @@ -9,12 +9,12 @@ use kube::runtime::reflector::{ObjectRef, store::Store}; use vector_lib::file_source::paths_provider::PathsProvider; use super::path_helpers::build_pod_logs_directory; -use crate::kubernetes::pod_manager_logic::extract_static_pod_config_hashsum; +use crate::kubernetes::pod_store::{PodStore, pod_uid_for_path}; /// A paths provider implementation that uses the state obtained from the /// the k8s API. pub struct K8sPathsProvider { - pod_state: Store, + pod_state: PodStore, namespace_state: Store, include_paths: Vec, exclude_paths: Vec, @@ -24,7 +24,7 @@ pub struct K8sPathsProvider { impl K8sPathsProvider { /// Create a new [`K8sPathsProvider`]. pub const fn new( - pod_state: Store, + pod_state: PodStore, namespace_state: Store, include_paths: Vec, exclude_paths: Vec, @@ -44,7 +44,7 @@ impl PathsProvider for K8sPathsProvider { type IntoIter = Vec; fn paths(&self) -> Vec { - let state = self.pod_state.state(); + let state = self.pod_state.list(); state .into_iter() @@ -103,15 +103,12 @@ fn extract_pod_logs_directory(pod: &Pod) -> Option { let namespace = metadata.namespace.as_ref()?; let name = metadata.name.as_ref()?; - let uid = if let Some(static_pod_config_hashsum) = extract_static_pod_config_hashsum(metadata) { - // If there's a static pod config hashsum - use it instead of uid. - static_pod_config_hashsum - } else { - // In the common case - just fallback to the real pod uid. - metadata.uid.as_ref()? - }; + // The static pod config hashsum is preferred over the real UID for mirror + // pods; `pod_uid_for_path` encapsulates that choice so the directory and the + // `PodStore` key always agree. + let uid = pod_uid_for_path(metadata)?; - Some(build_pod_logs_directory(namespace, name, uid)) + Some(build_pod_logs_directory(namespace, name, &uid)) } const CONTAINER_EXCLUSION_ANNOTATION_KEY: &str = "vector.dev/exclude-containers"; diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index fa001972a59d7..06a475f7ba967 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -51,7 +51,7 @@ use crate::{ KubernetesLogsEventNodeAnnotationError, KubernetesLogsEventsReceived, KubernetesLogsPodInfo, StreamClosedError, }, - kubernetes::{custom_reflector, meta_cache::MetaCache}, + kubernetes::{PodStore, custom_reflector, meta_cache::MetaCache, pod_reflector}, shutdown::ShutdownSignal, sources, sources::kubernetes_logs::partial_events_merger::merge_partial_events, @@ -734,13 +734,15 @@ impl Source { ) .backoff(watcher::DefaultBackoff::default()); - let pod_store_w = reflector::store::Writer::default(); - let pod_state = pod_store_w.as_reader(); - let pod_cacher = MetaCache::new(); + // Pods are tracked in a UID-keyed store (rather than kube's + // name+namespace-keyed reflector store) so that a pod recreated under a + // reused name cannot evict the metadata of a different incarnation whose + // log files are still being read. See + // https://github.com/vectordotdev/vector/issues/13467. + let pod_store = PodStore::new(); - reflectors.push(tokio::spawn(custom_reflector( - pod_store_w, - pod_cacher, + reflectors.push(tokio::spawn(pod_reflector( + pod_store.clone(), pod_watcher, delay_deletion, ))); @@ -795,13 +797,13 @@ impl Source { ))); let paths_provider = K8sPathsProvider::new( - pod_state.clone(), + pod_store.clone(), ns_state.clone(), include_paths, exclude_paths, insert_namespace_fields, ); - let annotator = PodMetadataAnnotator::new(pod_state, pod_fields_spec, log_namespace); + let annotator = PodMetadataAnnotator::new(pod_store, pod_fields_spec, log_namespace); let ns_annotator = NamespaceMetadataAnnotator::new(ns_state, namespace_fields_spec, log_namespace); let node_annotator = NodeMetadataAnnotator::new(node_state, node_field_spec, log_namespace); diff --git a/src/sources/kubernetes_logs/pod_metadata_annotator.rs b/src/sources/kubernetes_logs/pod_metadata_annotator.rs index 5f62af1f638cc..4090bfbd8956f 100644 --- a/src/sources/kubernetes_logs/pod_metadata_annotator.rs +++ b/src/sources/kubernetes_logs/pod_metadata_annotator.rs @@ -6,7 +6,6 @@ use k8s_openapi::{ api::core::v1::{Container, ContainerStatus, Pod, PodSpec, PodStatus}, apimachinery::pkg::apis::meta::v1::ObjectMeta, }; -use kube::runtime::reflector::{ObjectRef, store::Store}; use vector_lib::{ config::{LegacyKey, LogNamespace}, configurable::configurable_component, @@ -21,7 +20,10 @@ use super::{ Config, path_helpers::{LogFileInfo, parse_log_file_path}, }; -use crate::event::{Event, LogEvent}; +use crate::{ + event::{Event, LogEvent}, + kubernetes::pod_store::PodStore, +}; /// Configuration for how the events are enriched with Pod metadata. #[configurable_component] @@ -175,7 +177,7 @@ impl Default for FieldsSpec { /// Annotate the event with pod metadata. pub struct PodMetadataAnnotator { - pods_state_reader: Store, + pods_state_reader: PodStore, fields_spec: FieldsSpec, log_namespace: LogNamespace, } @@ -183,7 +185,7 @@ pub struct PodMetadataAnnotator { impl PodMetadataAnnotator { /// Create a new [`PodMetadataAnnotator`]. pub const fn new( - pods_state_reader: Store, + pods_state_reader: PodStore, fields_spec: FieldsSpec, log_namespace: LogNamespace, ) -> Self { @@ -200,8 +202,11 @@ impl PodMetadataAnnotator { pub fn annotate<'a>(&self, event: &mut Event, file: &'a str) -> Option> { let log = event.as_mut_log(); let file_info = parse_log_file_path(file)?; - let obj = ObjectRef::::new(file_info.pod_name).within(file_info.pod_namespace); - let resource = self.pods_state_reader.get(&obj)?; + // Look up by the pod's path identifier (UID) so logs from a deleted pod + // are annotated with the metadata of the exact incarnation that wrote + // them, even if a pod with the same name and namespace has since been + // created. See https://github.com/vectordotdev/vector/issues/13467. + let resource = self.pods_state_reader.get(file_info.pod_uid)?; let pod: &Pod = resource.as_ref(); annotate_from_file_info(log, &self.fields_spec, &file_info, self.log_namespace);