Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,6 @@ all-integration-tests = [
"aws-integration-tests",
"axiom-integration-tests",
"azure-integration-tests",
"chronicle-integration-tests",
"clickhouse-integration-tests",
"databend-integration-tests",
"datadog-agent-integration-tests",
Expand Down Expand Up @@ -1065,7 +1064,6 @@ aws-sns-integration-tests = ["sinks-aws_sns"]
axiom-integration-tests = ["sinks-axiom"]
azure-blob-integration-tests = ["sinks-azure_blob"]
azure-logs-ingestion-integration-tests = ["sinks-azure_logs_ingestion"]
chronicle-integration-tests = ["sinks-gcp"]
clickhouse-integration-tests = ["sinks-clickhouse"]
databend-integration-tests = ["sinks-databend"]
datadog-agent-integration-tests = ["sources-datadog_agent"]
Expand Down
165 changes: 44 additions & 121 deletions src/sinks/gcp_chronicle/chronicle_unstructured.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,135 +670,58 @@ impl Service<ChronicleRequest> for ChronicleService {
}
}

#[cfg(all(test, feature = "chronicle-integration-tests"))]
mod integration_tests {
use reqwest::{Client, Method, Response};
use serde::{Deserialize, Serialize};
use vector_lib::event::{BatchNotifier, BatchStatus};
#[cfg(test)]
mod unit_tests {
use std::io::Write;

use super::*;
use crate::test_util::{
components::{
COMPONENT_ERROR_TAGS, SINK_TAGS, run_and_assert_sink_compliance,
run_and_assert_sink_error,
},
random_events_with_stream, random_string, trace_init,
};

const ADDRESS_ENV_VAR: &str = "CHRONICLE_ADDRESS";

fn config(log_type: &str, auth_path: &str) -> ChronicleUnstructuredConfig {
let address = std::env::var(ADDRESS_ENV_VAR).unwrap();
let config = format!(
indoc! { r#"
endpoint = "{}"
customer_id = "customer id"
namespace = "namespace"
credentials_path = "{}"
log_type = "{}"
encoding.codec = "text"
"# },
address, auth_path, log_type
);
use tempfile::NamedTempFile;
use wiremock::{Mock, MockServer, ResponseTemplate, matchers::method};

let config: ChronicleUnstructuredConfig = toml::from_str(&config).unwrap();
config
}

async fn config_build(
log_type: &str,
auth_path: &str,
) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
let cx = SinkContext::default();
config(log_type, auth_path).build(cx).await
}
use super::*;
use crate::test_util::{random_string, trace_init};

#[ignore = "https://github.com/vectordotdev/vector/issues/24133"]
#[tokio::test]
async fn publish_events() {
async fn invalid_credentials_rejected_by_oauth_server() {
trace_init();

let log_type = random_string(10);
let (sink, healthcheck) = config_build(&log_type, "tests/integration/gcp/config/auth.json")
.await
.expect("Building sink failed");

healthcheck.await.expect("Health check failed");

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input, events) = random_events_with_stream(100, 100, Some(batch));
run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await;
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

let response = pull_messages(&log_type).await;
let messages = response
.into_iter()
.map(|message| message.log_text)
.collect::<Vec<_>>();
assert_eq!(input.len(), messages.len());
for i in 0..input.len() {
let data = serde_json::to_value(&messages[i]).unwrap();
let expected = serde_json::to_value(input[i].as_log().get("message").unwrap()).unwrap();
assert_eq!(data, expected);
}
}
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.respond_with(ResponseTemplate::new(401))
.mount(&mock_server)
.await;

let base_url = mock_server.uri();
let creds = serde_json::json!({
"type": "service_account",
"project_id": "test",
"private_key_id": "1",
"private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIICXgIBAAKBgQDouHdVDVz0/M6PGe60Kf/g0nyOxCvbZgiUAZNzFimXDU+RpZ54\n6/oETl6VpRkbp8a4Xb8avll2lsamdHvGcsgnjJXdpp7LfWYLqHEpn0/XFM+womXg\nvglWCDwAsXmrmwpZKEC82mmyFigheyPA/sfuN6z+wa7P5B65xzIdDQX7nQIDAQAB\nAoGBANID/rUDrTrtll8v8Oon6OH0MjIIuOdzKhSfY3h9rKTDf2YaB2xq0KLoMpVr\ne8AoZb5l45t34naR1M3M2xKY7SSDAVJFfg/3Vxeot86DQ23IGLXj7LnNxXnvklXa\nEXaD8LNz/MXxS7/Lu0R+lEtjEkf23+BRb11fL6Q/EDToNHnhAkEA/FnwHhKMc/Bm\nXsS8bENuZP3SV2v7TU6MFTtXJFmsoZBxHnsM8UUi0gq9gBnApmdhy7v2N/Mv9gFI\nviSdr7vm1QJBAOwV3cHAciRHVK71TweOWIJKZBM9ZVut0VDs5GrBYZxGMBiOr3BI\ns7+0ugTKxVimuei6c0KNXw1kg3Vtc5+utakCQQDklAbXBpAomJHxt5zBKBc/7VXx\nEANyk/p5ZOXbLEsdkXuVU3p2tNwEi+v4s9r4H97Kr3goV+SSnbkpWntm6fn9AkBn\nFnE7rlXpA4C12QYGTaDWW7dxM0j0DGUvChH/j6uYuok73+o5hHWAy2DCwOwFduAN\nAIVd1S9hQLeqaf2oB3jpAkEAnRT+bAlMjtUOBO6XPNO4IbYwWJvGMcIEO7zu6AdB\nPJy3/U+bLimxFuYdrs6SnIHIUVdl35AlckHqzT54a5YKqQ==\n-----END RSA PRIVATE KEY-----",
"client_email": "test@test.com",
"client_id": "1",
"auth_uri": format!("{base_url}/o/oauth2/auth"),
"token_uri": format!("{base_url}/token"),
"auth_provider_x509_cert_url": format!("{base_url}/oauth2/v1/certs"),
"client_x509_cert_url": "https://example.com"
});

#[tokio::test]
async fn invalid_credentials() {
trace_init();
let mut tmp = NamedTempFile::new().unwrap();
write!(tmp, "{creds}").unwrap();

let log_type = random_string(10);
// Test with an auth file that doesnt match the public key sent to the dummy chronicle server.
let sink = config_build(&log_type, "tests/integration/gcp/config/invalidauth.json").await;

assert!(sink.is_err())
}

#[ignore = "https://github.com/vectordotdev/vector/issues/24133"]
#[tokio::test]
async fn publish_invalid_events() {
trace_init();

// The chronicle-emulator we are testing against is setup so a `log_type` of "INVALID"
// will return a `400 BAD_REQUEST`.
let log_type = "INVALID";
let (sink, healthcheck) = config_build(log_type, "tests/integration/gcp/config/auth.json")
.await
.expect("Building sink failed");

healthcheck.await.expect("Health check failed");

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (_input, events) = random_events_with_stream(100, 100, Some(batch));
run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Log {
customer_id: String,
namespace: String,
log_type: String,
log_text: String,
ts_rfc3339: String,
}

async fn request(method: Method, path: &str, log_type: &str) -> Response {
let address = std::env::var(ADDRESS_ENV_VAR).unwrap();
let url = format!("{address}/{path}");
Client::new()
.request(method.clone(), &url)
.query(&[("log_type", log_type)])
.send()
.await
.unwrap_or_else(|_| panic!("Sending {method} request to {url} failed"))
}

async fn pull_messages(log_type: &str) -> Vec<Log> {
request(Method::GET, "logs", log_type)
.await
.json::<Vec<Log>>()
.await
.expect("Extracting pull data failed")
let cx = SinkContext::default();
let config: ChronicleUnstructuredConfig = serde_yaml::from_str(&format!(
indoc! { r#"
endpoint: "http://127.0.0.1:1"
customer_id: test-customer
credentials_path: "{}"
log_type: "{}"
encoding:
codec: text
"# },
tmp.path().display(),
log_type
))
.unwrap();
assert!(config.build(cx).await.is_err());
}
}
9 changes: 0 additions & 9 deletions tests/integration/gcp/config/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,6 @@ services:
environment:
- PUBSUB_PROJECT1=testproject,topic1:subscription1
- PUBSUB_PROJECT2=sourceproject,topic2:subscription2
chronicle-emulator:
image: docker.io/timberio/chronicle-emulator:${CONFIG_VERSION}
ports:
- 3000:3000
volumes:
- ./public.pem:/public.pem:ro
command:
- -p
- /public.pem

networks:
default:
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/gcp/config/test.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
features:
- gcp-integration-tests
- chronicle-integration-tests

test_filter: "::gcp"

env:
EMULATOR_ADDRESS: http://gcloud-pubsub:8681
CHRONICLE_ADDRESS: http://chronicle-emulator:3000

matrix:
version: [latest]
Expand Down
Loading