From b68d2fc9c90c702f243f44423d19625cce7fd30b Mon Sep 17 00:00:00 2001 From: Shivansh Mathur Date: Sun, 21 Jun 2026 01:02:07 +0530 Subject: [PATCH] enhance timestamp parsing for logs with string and integer formats --- ...346_cloudwatch_logs_parse_timestamp.fix.md | 3 + src/sinks/aws_cloudwatch_logs/config.rs | 1 + .../aws_cloudwatch_logs/request_builder.rs | 96 ++++++++++++++++++- 3 files changed, 98 insertions(+), 2 deletions(-) create mode 100644 changelog.d/15346_cloudwatch_logs_parse_timestamp.fix.md diff --git a/changelog.d/15346_cloudwatch_logs_parse_timestamp.fix.md b/changelog.d/15346_cloudwatch_logs_parse_timestamp.fix.md new file mode 100644 index 0000000000000..1336e48cc0610 --- /dev/null +++ b/changelog.d/15346_cloudwatch_logs_parse_timestamp.fix.md @@ -0,0 +1,3 @@ +The `aws_cloudwatch_logs` sink now parses string- and integer-encoded `.timestamp` values instead of silently replacing them with the current time. Integer values are interpreted as Unix seconds. If the value cannot be parsed, the sink falls back to the current time and emits a warning. + +authors: shivansh-mathur diff --git a/src/sinks/aws_cloudwatch_logs/config.rs b/src/sinks/aws_cloudwatch_logs/config.rs index 6ec24d7e093e2..3c4db38e12aee 100644 --- a/src/sinks/aws_cloudwatch_logs/config.rs +++ b/src/sinks/aws_cloudwatch_logs/config.rs @@ -223,6 +223,7 @@ impl SinkConfig for CloudwatchLogsSinkConfig { stream_template: self.stream_name.clone(), transformer, encoder, + timezone: cx.globals.timezone(), }, service: svc, diff --git a/src/sinks/aws_cloudwatch_logs/request_builder.rs b/src/sinks/aws_cloudwatch_logs/request_builder.rs index 6d89cd4b6d6e0..e8254acf83bf2 100644 --- a/src/sinks/aws_cloudwatch_logs/request_builder.rs +++ b/src/sinks/aws_cloudwatch_logs/request_builder.rs @@ -16,6 +16,7 @@ use crate::{ internal_events::AwsCloudwatchLogsMessageSizeError, sinks::{aws_cloudwatch_logs::CloudwatchKey, util::metadata::RequestMetadataBuilder}, template::Template, + types::Conversion, }; // Estimated maximum size of InputLogEvent is 50 bytes with an empty message @@ -55,6 +56,7 @@ pub struct CloudwatchRequestBuilder { pub stream_template: Template, pub transformer: Transformer, pub encoder: Encoder<()>, + pub timezone: vector_lib::TimeZone, } impl CloudwatchRequestBuilder { @@ -86,7 +88,21 @@ impl CloudwatchRequestBuilder { let timestamp = match event.as_mut_log().remove_timestamp() { Some(Value::Timestamp(ts)) => ts.timestamp_millis(), - _ => Utc::now().timestamp_millis(), + Some(value) => { + // See issue #15346: parse non-Timestamp `.timestamp` values before falling back. + match Conversion::Timestamp(self.timezone).convert::(value.coerce_to_bytes()) + { + Ok(Value::Timestamp(ts)) => ts.timestamp_millis(), + Ok(_) => { + unreachable!("Conversion::Timestamp always produces a Value::Timestamp") + } + Err(error) => { + warn!(message = "Unable to parse timestamp, using current time instead.", %error, internal_log_rate_limit = true); + Utc::now().timestamp_millis() + } + } + } + None => Utc::now().timestamp_millis(), }; let finalizers = event.take_finalizers(); @@ -139,10 +155,11 @@ impl ByteSizeOf for CloudwatchRequest { #[cfg(test)] mod tests { - use chrono::Utc; + use chrono::{DateTime, Utc}; use vector_lib::{config::log_schema, event::LogEvent}; use super::{CloudwatchRequestBuilder, MAX_MESSAGE_SIZE}; + use crate::event::Value; #[test] fn test() { @@ -151,6 +168,7 @@ mod tests { stream_template: "stream".try_into().unwrap(), transformer: Default::default(), encoder: Default::default(), + timezone: Default::default(), }; let timestamp = Utc::now(); let message = "event message"; @@ -169,6 +187,7 @@ mod tests { stream_template: "stream".try_into().unwrap(), transformer: Default::default(), encoder: Default::default(), + timezone: Default::default(), }; let timestamp = Utc::now(); @@ -179,4 +198,77 @@ mod tests { let request = request_builder.build(event.into()); assert!(request.is_none(), "Expected None for oversized log event"); } + + #[test] + fn test_string_timestamp_is_parsed() { + let mut request_builder = CloudwatchRequestBuilder { + group_template: "group".try_into().unwrap(), + stream_template: "stream".try_into().unwrap(), + transformer: Default::default(), + encoder: Default::default(), + timezone: Default::default(), + }; + let message = "event message"; + let mut event = LogEvent::from(message); + event.insert( + log_schema().timestamp_key_target_path().unwrap(), + Value::Bytes("2022-11-24T21:06:29.000Z".into()), + ); + + let request = request_builder.build(event.into()).unwrap(); + let expected = DateTime::parse_from_rfc3339("2022-11-24T21:06:29.000Z") + .unwrap() + .timestamp_millis(); + assert_eq!(request.timestamp, expected); + } + + #[test] + fn test_integer_timestamp_is_parsed_as_unix_seconds() { + let mut request_builder = CloudwatchRequestBuilder { + group_template: "group".try_into().unwrap(), + stream_template: "stream".try_into().unwrap(), + transformer: Default::default(), + encoder: Default::default(), + timezone: Default::default(), + }; + let message = "event message"; + let mut event = LogEvent::from(message); + // Integer timestamps are interpreted as Unix seconds. + event.insert( + log_schema().timestamp_key_target_path().unwrap(), + Value::Integer(1669324389), + ); + + let request = request_builder.build(event.into()).unwrap(); + assert_eq!(request.timestamp, 1669324389 * 1000); + } + + #[test] + fn test_unparseable_timestamp_falls_back_to_now() { + let mut request_builder = CloudwatchRequestBuilder { + group_template: "group".try_into().unwrap(), + stream_template: "stream".try_into().unwrap(), + transformer: Default::default(), + encoder: Default::default(), + timezone: Default::default(), + }; + let message = "event message"; + let mut event = LogEvent::from(message); + event.insert( + log_schema().timestamp_key_target_path().unwrap(), + Value::Bytes("not-a-timestamp".into()), + ); + + let before = Utc::now().timestamp_millis(); + let request = request_builder.build(event.into()).unwrap(); + let after = Utc::now().timestamp_millis(); + + assert!( + request.timestamp >= before && request.timestamp <= after, + "Expected timestamp to fall back to current time, got {} (window: {}..{})", + request.timestamp, + before, + after + ); + } }