From 4122fd11a0ffc84796e07d612923371627262634 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Tue, 23 Jun 2026 16:20:57 -0400 Subject: [PATCH] chore(gcp chronicle sink): remove timberio/chronicle-emulator dependency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the two ignored integration tests (publish_events, publish_invalid_events) with nothing — they are tracked in #24133. Convert invalid_credentials into a self-contained unit test using wiremock to mock the OAuth token endpoint, eliminating the need for the hosted timberio/chronicle-emulator Docker image. --- Cargo.toml | 2 - .../gcp_chronicle/chronicle_unstructured.rs | 165 +++++------------- tests/integration/gcp/config/compose.yaml | 9 - tests/integration/gcp/config/test.yaml | 2 - 4 files changed, 44 insertions(+), 134 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 324843447172d..31da1df0dda70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -994,7 +994,6 @@ all-integration-tests = [ "aws-integration-tests", "axiom-integration-tests", "azure-integration-tests", - "chronicle-integration-tests", "clickhouse-integration-tests", "databend-integration-tests", "datadog-agent-integration-tests", @@ -1065,7 +1064,6 @@ aws-sns-integration-tests = ["sinks-aws_sns"] axiom-integration-tests = ["sinks-axiom"] azure-blob-integration-tests = ["sinks-azure_blob"] azure-logs-ingestion-integration-tests = ["sinks-azure_logs_ingestion"] -chronicle-integration-tests = ["sinks-gcp"] clickhouse-integration-tests = ["sinks-clickhouse"] databend-integration-tests = ["sinks-databend"] datadog-agent-integration-tests = ["sources-datadog_agent"] diff --git a/src/sinks/gcp_chronicle/chronicle_unstructured.rs b/src/sinks/gcp_chronicle/chronicle_unstructured.rs index e8898a8da768a..6e9154efe6950 100644 --- a/src/sinks/gcp_chronicle/chronicle_unstructured.rs +++ b/src/sinks/gcp_chronicle/chronicle_unstructured.rs @@ -670,135 +670,58 @@ impl Service for ChronicleService { } } -#[cfg(all(test, feature = "chronicle-integration-tests"))] -mod integration_tests { - use reqwest::{Client, Method, Response}; - use serde::{Deserialize, Serialize}; - use vector_lib::event::{BatchNotifier, BatchStatus}; +#[cfg(test)] +mod unit_tests { + use std::io::Write; - use super::*; - use crate::test_util::{ - components::{ - COMPONENT_ERROR_TAGS, SINK_TAGS, run_and_assert_sink_compliance, - run_and_assert_sink_error, - }, - random_events_with_stream, random_string, trace_init, - }; - - const ADDRESS_ENV_VAR: &str = "CHRONICLE_ADDRESS"; - - fn config(log_type: &str, auth_path: &str) -> ChronicleUnstructuredConfig { - let address = std::env::var(ADDRESS_ENV_VAR).unwrap(); - let config = format!( - indoc! { r#" - endpoint = "{}" - customer_id = "customer id" - namespace = "namespace" - credentials_path = "{}" - log_type = "{}" - encoding.codec = "text" - "# }, - address, auth_path, log_type - ); + use tempfile::NamedTempFile; + use wiremock::{Mock, MockServer, ResponseTemplate, matchers::method}; - let config: ChronicleUnstructuredConfig = toml::from_str(&config).unwrap(); - config - } - - async fn config_build( - log_type: &str, - auth_path: &str, - ) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> { - let cx = SinkContext::default(); - config(log_type, auth_path).build(cx).await - } + use super::*; + use crate::test_util::{random_string, trace_init}; - #[ignore = "https://github.com/vectordotdev/vector/issues/24133"] #[tokio::test] - async fn publish_events() { + async fn invalid_credentials_rejected_by_oauth_server() { trace_init(); - let log_type = random_string(10); - let (sink, healthcheck) = config_build(&log_type, "tests/integration/gcp/config/auth.json") - .await - .expect("Building sink failed"); - - healthcheck.await.expect("Health check failed"); - - let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - let (input, events) = random_events_with_stream(100, 100, Some(batch)); - run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; - assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); - - let response = pull_messages(&log_type).await; - let messages = response - .into_iter() - .map(|message| message.log_text) - .collect::>(); - assert_eq!(input.len(), messages.len()); - for i in 0..input.len() { - let data = serde_json::to_value(&messages[i]).unwrap(); - let expected = serde_json::to_value(input[i].as_log().get("message").unwrap()).unwrap(); - assert_eq!(data, expected); - } - } + let mock_server = MockServer::start().await; + Mock::given(method("POST")) + .respond_with(ResponseTemplate::new(401)) + .mount(&mock_server) + .await; + + let base_url = mock_server.uri(); + let creds = serde_json::json!({ + "type": "service_account", + "project_id": "test", + "private_key_id": "1", + "private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIICXgIBAAKBgQDouHdVDVz0/M6PGe60Kf/g0nyOxCvbZgiUAZNzFimXDU+RpZ54\n6/oETl6VpRkbp8a4Xb8avll2lsamdHvGcsgnjJXdpp7LfWYLqHEpn0/XFM+womXg\nvglWCDwAsXmrmwpZKEC82mmyFigheyPA/sfuN6z+wa7P5B65xzIdDQX7nQIDAQAB\nAoGBANID/rUDrTrtll8v8Oon6OH0MjIIuOdzKhSfY3h9rKTDf2YaB2xq0KLoMpVr\ne8AoZb5l45t34naR1M3M2xKY7SSDAVJFfg/3Vxeot86DQ23IGLXj7LnNxXnvklXa\nEXaD8LNz/MXxS7/Lu0R+lEtjEkf23+BRb11fL6Q/EDToNHnhAkEA/FnwHhKMc/Bm\nXsS8bENuZP3SV2v7TU6MFTtXJFmsoZBxHnsM8UUi0gq9gBnApmdhy7v2N/Mv9gFI\nviSdr7vm1QJBAOwV3cHAciRHVK71TweOWIJKZBM9ZVut0VDs5GrBYZxGMBiOr3BI\ns7+0ugTKxVimuei6c0KNXw1kg3Vtc5+utakCQQDklAbXBpAomJHxt5zBKBc/7VXx\nEANyk/p5ZOXbLEsdkXuVU3p2tNwEi+v4s9r4H97Kr3goV+SSnbkpWntm6fn9AkBn\nFnE7rlXpA4C12QYGTaDWW7dxM0j0DGUvChH/j6uYuok73+o5hHWAy2DCwOwFduAN\nAIVd1S9hQLeqaf2oB3jpAkEAnRT+bAlMjtUOBO6XPNO4IbYwWJvGMcIEO7zu6AdB\nPJy3/U+bLimxFuYdrs6SnIHIUVdl35AlckHqzT54a5YKqQ==\n-----END RSA PRIVATE KEY-----", + "client_email": "test@test.com", + "client_id": "1", + "auth_uri": format!("{base_url}/o/oauth2/auth"), + "token_uri": format!("{base_url}/token"), + "auth_provider_x509_cert_url": format!("{base_url}/oauth2/v1/certs"), + "client_x509_cert_url": "https://example.com" + }); - #[tokio::test] - async fn invalid_credentials() { - trace_init(); + let mut tmp = NamedTempFile::new().unwrap(); + write!(tmp, "{creds}").unwrap(); let log_type = random_string(10); - // Test with an auth file that doesnt match the public key sent to the dummy chronicle server. - let sink = config_build(&log_type, "tests/integration/gcp/config/invalidauth.json").await; - - assert!(sink.is_err()) - } - - #[ignore = "https://github.com/vectordotdev/vector/issues/24133"] - #[tokio::test] - async fn publish_invalid_events() { - trace_init(); - - // The chronicle-emulator we are testing against is setup so a `log_type` of "INVALID" - // will return a `400 BAD_REQUEST`. - let log_type = "INVALID"; - let (sink, healthcheck) = config_build(log_type, "tests/integration/gcp/config/auth.json") - .await - .expect("Building sink failed"); - - healthcheck.await.expect("Health check failed"); - - let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - let (_input, events) = random_events_with_stream(100, 100, Some(batch)); - run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await; - assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected)); - } - - #[derive(Clone, Debug, Deserialize, Serialize)] - pub struct Log { - customer_id: String, - namespace: String, - log_type: String, - log_text: String, - ts_rfc3339: String, - } - - async fn request(method: Method, path: &str, log_type: &str) -> Response { - let address = std::env::var(ADDRESS_ENV_VAR).unwrap(); - let url = format!("{address}/{path}"); - Client::new() - .request(method.clone(), &url) - .query(&[("log_type", log_type)]) - .send() - .await - .unwrap_or_else(|_| panic!("Sending {method} request to {url} failed")) - } - - async fn pull_messages(log_type: &str) -> Vec { - request(Method::GET, "logs", log_type) - .await - .json::>() - .await - .expect("Extracting pull data failed") + let cx = SinkContext::default(); + let config: ChronicleUnstructuredConfig = serde_yaml::from_str(&format!( + indoc! { r#" + endpoint: "http://127.0.0.1:1" + customer_id: test-customer + credentials_path: "{}" + log_type: "{}" + encoding: + codec: text + "# }, + tmp.path().display(), + log_type + )) + .unwrap(); + assert!(config.build(cx).await.is_err()); } } diff --git a/tests/integration/gcp/config/compose.yaml b/tests/integration/gcp/config/compose.yaml index d911f9fad8872..59f04ce785c7d 100644 --- a/tests/integration/gcp/config/compose.yaml +++ b/tests/integration/gcp/config/compose.yaml @@ -6,15 +6,6 @@ services: environment: - PUBSUB_PROJECT1=testproject,topic1:subscription1 - PUBSUB_PROJECT2=sourceproject,topic2:subscription2 - chronicle-emulator: - image: docker.io/timberio/chronicle-emulator:${CONFIG_VERSION} - ports: - - 3000:3000 - volumes: - - ./public.pem:/public.pem:ro - command: - - -p - - /public.pem networks: default: diff --git a/tests/integration/gcp/config/test.yaml b/tests/integration/gcp/config/test.yaml index 474826afd6aed..8df9dd6fec80b 100644 --- a/tests/integration/gcp/config/test.yaml +++ b/tests/integration/gcp/config/test.yaml @@ -1,12 +1,10 @@ features: - gcp-integration-tests - - chronicle-integration-tests test_filter: "::gcp" env: EMULATOR_ADDRESS: http://gcloud-pubsub:8681 - CHRONICLE_ADDRESS: http://chronicle-emulator:3000 matrix: version: [latest]