Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/15346_cloudwatch_logs_parse_timestamp.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `aws_cloudwatch_logs` sink now parses string- and integer-encoded `.timestamp` values instead of silently replacing them with the current time. Integer values are interpreted as Unix seconds. If the value cannot be parsed, the sink falls back to the current time and emits a warning.

authors: shivansh-mathur
1 change: 1 addition & 0 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl SinkConfig for CloudwatchLogsSinkConfig {
stream_template: self.stream_name.clone(),
transformer,
encoder,
timezone: cx.globals.timezone(),
},

service: svc,
Expand Down
96 changes: 94 additions & 2 deletions src/sinks/aws_cloudwatch_logs/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
internal_events::AwsCloudwatchLogsMessageSizeError,
sinks::{aws_cloudwatch_logs::CloudwatchKey, util::metadata::RequestMetadataBuilder},
template::Template,
types::Conversion,
};

// Estimated maximum size of InputLogEvent is 50 bytes with an empty message
Expand Down Expand Up @@ -55,6 +56,7 @@ pub struct CloudwatchRequestBuilder {
pub stream_template: Template,
pub transformer: Transformer,
pub encoder: Encoder<()>,
pub timezone: vector_lib::TimeZone,
}

impl CloudwatchRequestBuilder {
Expand Down Expand Up @@ -86,7 +88,21 @@ impl CloudwatchRequestBuilder {

let timestamp = match event.as_mut_log().remove_timestamp() {
Some(Value::Timestamp(ts)) => ts.timestamp_millis(),
_ => Utc::now().timestamp_millis(),
Some(value) => {
// See issue #15346: parse non-Timestamp `.timestamp` values before falling back.
match Conversion::Timestamp(self.timezone).convert::<Value>(value.coerce_to_bytes())
{
Ok(Value::Timestamp(ts)) => ts.timestamp_millis(),
Ok(_) => {
unreachable!("Conversion::Timestamp always produces a Value::Timestamp")
}
Err(error) => {
warn!(message = "Unable to parse timestamp, using current time instead.", %error, internal_log_rate_limit = true);
Utc::now().timestamp_millis()
}
}
}
None => Utc::now().timestamp_millis(),
};

let finalizers = event.take_finalizers();
Expand Down Expand Up @@ -139,10 +155,11 @@ impl ByteSizeOf for CloudwatchRequest {

#[cfg(test)]
mod tests {
use chrono::Utc;
use chrono::{DateTime, Utc};
use vector_lib::{config::log_schema, event::LogEvent};

use super::{CloudwatchRequestBuilder, MAX_MESSAGE_SIZE};
use crate::event::Value;

#[test]
fn test() {
Expand All @@ -151,6 +168,7 @@ mod tests {
stream_template: "stream".try_into().unwrap(),
transformer: Default::default(),
encoder: Default::default(),
timezone: Default::default(),
};
let timestamp = Utc::now();
let message = "event message";
Expand All @@ -169,6 +187,7 @@ mod tests {
stream_template: "stream".try_into().unwrap(),
transformer: Default::default(),
encoder: Default::default(),
timezone: Default::default(),
};

let timestamp = Utc::now();
Expand All @@ -179,4 +198,77 @@ mod tests {
let request = request_builder.build(event.into());
assert!(request.is_none(), "Expected None for oversized log event");
}

#[test]
fn test_string_timestamp_is_parsed() {
let mut request_builder = CloudwatchRequestBuilder {
group_template: "group".try_into().unwrap(),
stream_template: "stream".try_into().unwrap(),
transformer: Default::default(),
encoder: Default::default(),
timezone: Default::default(),
};
let message = "event message";
let mut event = LogEvent::from(message);
event.insert(
log_schema().timestamp_key_target_path().unwrap(),
Value::Bytes("2022-11-24T21:06:29.000Z".into()),
);

let request = request_builder.build(event.into()).unwrap();
let expected = DateTime::parse_from_rfc3339("2022-11-24T21:06:29.000Z")
.unwrap()
.timestamp_millis();
assert_eq!(request.timestamp, expected);
}

#[test]
fn test_integer_timestamp_is_parsed_as_unix_seconds() {
let mut request_builder = CloudwatchRequestBuilder {
group_template: "group".try_into().unwrap(),
stream_template: "stream".try_into().unwrap(),
transformer: Default::default(),
encoder: Default::default(),
timezone: Default::default(),
};
let message = "event message";
let mut event = LogEvent::from(message);
// Integer timestamps are interpreted as Unix seconds.
event.insert(
log_schema().timestamp_key_target_path().unwrap(),
Value::Integer(1669324389),
);

let request = request_builder.build(event.into()).unwrap();
assert_eq!(request.timestamp, 1669324389 * 1000);
}

#[test]
fn test_unparseable_timestamp_falls_back_to_now() {
let mut request_builder = CloudwatchRequestBuilder {
group_template: "group".try_into().unwrap(),
stream_template: "stream".try_into().unwrap(),
transformer: Default::default(),
encoder: Default::default(),
timezone: Default::default(),
};
let message = "event message";
let mut event = LogEvent::from(message);
event.insert(
log_schema().timestamp_key_target_path().unwrap(),
Value::Bytes("not-a-timestamp".into()),
);

let before = Utc::now().timestamp_millis();
let request = request_builder.build(event.into()).unwrap();
let after = Utc::now().timestamp_millis();

assert!(
request.timestamp >= before && request.timestamp <= after,
"Expected timestamp to fall back to current time, got {} (window: {}..{})",
request.timestamp,
before,
after
);
}
}
Loading