Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/init/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ impl Agent {
) -> Result<(), Box<dyn Error + Send + Sync>> {
let config = self.config;

info!("Starting Rotel.",);
info!("Starting Rotel.");

let resource_attributes = match &config.otel_resource_attributes {
Some(s) => crate::init::parse::parse_key_vals::<String, String>(s)?,
None => Vec::new(),
};

// Initialize the TLS library, we may want to do this conditionally
init_crypto_provider()?;
Expand Down Expand Up @@ -773,7 +778,7 @@ impl Agent {
pipeline_flush_sub.as_mut().map(|sub| sub.subscribe()),
build_traces_batch_config(config.batch.clone()),
config.otlp_with_trace_processor.clone(),
config.otel_resource_attributes.clone(),
resource_attributes.clone(),
);

let log_traces = config.debug_log.contains(&DebugLogParam::Traces);
Expand All @@ -800,7 +805,7 @@ impl Agent {
pipeline_flush_sub.as_mut().map(|sub| sub.subscribe()),
build_metrics_batch_config(config.batch.clone()),
config.otlp_with_metrics_processor.clone(),
config.otel_resource_attributes.clone(),
resource_attributes.clone(),
);

let log_metrics = config.debug_log.contains(&DebugLogParam::Metrics);
Expand All @@ -827,7 +832,7 @@ impl Agent {
pipeline_flush_sub.as_mut().map(|sub| sub.subscribe()),
build_logs_batch_config(config.batch.clone()),
config.otlp_with_logs_processor.clone(),
config.otel_resource_attributes.clone(),
resource_attributes.clone(),
);

let log_logs = config.debug_log.contains(&DebugLogParam::Logs);
Expand All @@ -854,7 +859,7 @@ impl Agent {
pipeline_flush_sub.as_mut().map(|sub| sub.subscribe()),
build_metrics_batch_config(config.batch.clone()),
vec![],
config.otel_resource_attributes.clone(),
resource_attributes.clone(),
);

let log_metrics = config.debug_log.contains(&DebugLogParam::Metrics);
Expand Down
7 changes: 4 additions & 3 deletions src/init/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::init::kafka_exporter::KafkaExporterArgs;
use crate::init::kafka_receiver::KafkaReceiverArgs;
use crate::init::otlp_exporter::OTLPExporterArgs;
use crate::init::otlp_receiver::OTLPReceiverArgs;
#[cfg(feature = "prometheus")]
use crate::init::parse;
use crate::init::xray_exporter::XRayExporterArgs;
use crate::topology::debug::DebugVerbosity;
Expand Down Expand Up @@ -93,8 +94,8 @@ pub struct AgentRun {
pub otlp_with_metrics_processor: Vec<String>,

/// Comma-separated, key=value pairs of resource attributes to set
#[arg(long, env = "ROTEL_OTEL_RESOURCE_ATTRIBUTES", value_parser = parse::parse_key_val::<String, String>, value_delimiter = ',')]
pub otel_resource_attributes: Vec<(String, String)>,
#[arg(long, env = "ROTEL_OTEL_RESOURCE_ATTRIBUTES")]
pub otel_resource_attributes: Option<String>,

/// Enable reporting of internal telemetry
#[arg(long, env = "ROTEL_ENABLE_INTERNAL_TELEMETRY", default_value = "false")]
Expand Down Expand Up @@ -176,7 +177,7 @@ impl Default for AgentRun {
otlp_with_trace_processor: Vec::new(),
otlp_with_logs_processor: Vec::new(),
otlp_with_metrics_processor: Vec::new(),
otel_resource_attributes: Vec::new(),
otel_resource_attributes: None,
enable_internal_telemetry: false,
batch: BatchArgs::default(),
exporter: None,
Expand Down
34 changes: 17 additions & 17 deletions src/init/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,17 +237,17 @@ impl TryIntoConfig for ExporterArgs {
.unwrap_or_else(|| Endpoint::Base(endpoint.unwrap().clone()));

if pipeline_type == PipelineType::Metrics {
Ok(ExporterConfig::Otlp(otlp.into_exporter_config(
Ok(ExporterConfig::Otlp(otlp.try_into_exporter_config(
"metrics",
endpoint,
global_retry,
)))
)?))
} else {
Ok(ExporterConfig::Otlp(otlp.into_exporter_config(
Ok(ExporterConfig::Otlp(otlp.try_into_exporter_config(
"internal_metrics",
endpoint,
global_retry,
)))
)?))
}
}
PipelineType::Logs => {
Expand All @@ -260,11 +260,11 @@ impl TryIntoConfig for ExporterArgs {
.map(|e| Endpoint::Full(e.clone()))
.unwrap_or_else(|| Endpoint::Base(endpoint.unwrap().clone()));

Ok(ExporterConfig::Otlp(otlp.into_exporter_config(
Ok(ExporterConfig::Otlp(otlp.try_into_exporter_config(
"logs",
endpoint,
global_retry,
)))
)?))
}
PipelineType::Traces => {
if endpoint.is_none() && otlp.traces_endpoint.is_none() {
Expand All @@ -276,11 +276,11 @@ impl TryIntoConfig for ExporterArgs {
.map(|e| Endpoint::Full(e.clone()))
.unwrap_or_else(|| Endpoint::Base(endpoint.unwrap().clone()));

Ok(ExporterConfig::Otlp(otlp.into_exporter_config(
Ok(ExporterConfig::Otlp(otlp.try_into_exporter_config(
"traces",
endpoint,
global_retry,
)))
)?))
}
}
}
Expand Down Expand Up @@ -411,7 +411,7 @@ impl TryIntoConfig for ExporterArgs {
if k.brokers.is_empty() {
return Err("must specify a Kafka broker address".into());
}
Ok(ExporterConfig::Kafka(k.build_config()))
Ok(ExporterConfig::Kafka(k.build_config()?))
}
}
}
Expand All @@ -423,7 +423,7 @@ pub(crate) fn get_receivers_config(
) -> Result<HashMap<Receiver, ReceiverConfig>, BoxError> {
if config.receivers.is_none() && config.receiver.is_none() && otlp_default_receiver {
let mut map = HashMap::new();
map.insert(Receiver::Otlp, get_receiver_config(config, Receiver::Otlp));
map.insert(Receiver::Otlp, get_receiver_config(config, Receiver::Otlp)?);
return Ok(map);
}
if config.receivers.is_some() && config.receiver.is_some() {
Expand All @@ -432,7 +432,7 @@ pub(crate) fn get_receivers_config(

if let Some(receiver) = config.receiver {
let mut map = HashMap::new();
map.insert(receiver, get_receiver_config(config, receiver));
map.insert(receiver, get_receiver_config(config, receiver)?);
return Ok(map);
}

Expand All @@ -441,7 +441,7 @@ pub(crate) fn get_receivers_config(
let rec: Vec<&str> = recs.split(",").collect();
for receiver in rec {
let receiver: Receiver = receiver.parse()?;
receivers.insert(receiver, get_receiver_config(config, receiver));
receivers.insert(receiver, get_receiver_config(config, receiver)?);
}
}
Ok(receivers)
Expand Down Expand Up @@ -634,16 +634,16 @@ fn args_from_env_prefix(exporter_type: &str, prefix: &str) -> Result<ExporterArg
}

// Function is currently small but expect it will grow over time so splitting out rather than inlining for now.
fn get_receiver_config(config: &AgentRun, receiver: Receiver) -> ReceiverConfig {
match receiver {
fn get_receiver_config(config: &AgentRun, receiver: Receiver) -> Result<ReceiverConfig, BoxError> {
Ok(match receiver {
Receiver::Otlp => ReceiverConfig::Otlp(OTLPReceiverConfig::from(&config.otlp_receiver)),
#[cfg(feature = "rdkafka")]
Receiver::Kafka => ReceiverConfig::Kafka(config.kafka_receiver.build_config()),
Receiver::Kafka => ReceiverConfig::Kafka(config.kafka_receiver.build_config()?),
#[cfg(feature = "fluent_receiver")]
Receiver::Fluent => ReceiverConfig::Fluent(config.fluent_receiver.build_config()),
#[cfg(feature = "file_receiver")]
Receiver::File => ReceiverConfig::File(config.file_receiver.build_config()),
}
})
}

fn get_single_exporter_config(
Expand Down Expand Up @@ -737,7 +737,7 @@ fn get_single_exporter_config(

#[cfg(feature = "rdkafka")]
Exporter::Kafka => {
let kafka_config = config.kafka_exporter.build_config();
let kafka_config = config.kafka_exporter.build_config()?;
cfg.traces.push(ExporterConfig::Kafka(kafka_config.clone()));
cfg.metrics
.push(ExporterConfig::Kafka(kafka_config.clone()));
Expand Down
25 changes: 14 additions & 11 deletions src/init/kafka_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::exporters::kafka::config::{
AcknowledgementMode, Compression, KafkaExporterConfig, PartitionerType, SaslMechanism,
SecurityProtocol, SerializationFormat,
};
use crate::init::parse::parse_key_val;
use clap::{Args, ValueEnum};
use serde::Deserialize;
use tower::BoxError;

#[derive(Debug, Args, Clone, Deserialize)]
#[serde(default)]
Expand Down Expand Up @@ -179,12 +179,9 @@ pub struct KafkaExporterArgs {
#[arg(
id("KAFKA_EXPORTER_CUSTOM_CONFIG"),
long("kafka-exporter-custom-config"),
env = "ROTEL_KAFKA_EXPORTER_CUSTOM_CONFIG",
value_parser = parse_key_val::<String, String>,
value_delimiter = ','
env = "ROTEL_KAFKA_EXPORTER_CUSTOM_CONFIG"
)]
#[serde(deserialize_with = "crate::init::parse::deserialize_key_value_pairs")]
pub custom_config: Vec<(String, String)>,
pub custom_config: Option<String>,

/// SASL username for authentication
#[arg(
Expand Down Expand Up @@ -243,7 +240,7 @@ impl Default for KafkaExporterArgs {
partitioner: Default::default(),
partition_metrics_by_resource_attributes: false,
partition_logs_by_resource_attributes: false,
custom_config: vec![],
custom_config: None,
compression: Default::default(),
sasl_username: None,
sasl_password: None,
Expand Down Expand Up @@ -424,7 +421,7 @@ impl From<KafkaPartitionerType> for PartitionerType {
}

impl KafkaExporterArgs {
pub fn build_config(&self) -> KafkaExporterConfig {
pub fn build_config(&self) -> Result<KafkaExporterConfig, BoxError> {
let mut config = KafkaExporterConfig::new(self.brokers.clone())
.with_traces_topic(self.traces_topic.clone())
.with_metrics_topic(self.metrics_topic.clone())
Expand All @@ -445,8 +442,14 @@ impl KafkaExporterArgs {
.with_partition_metrics_by_resource_attributes(
self.partition_metrics_by_resource_attributes,
)
.with_partition_logs_by_resource_attributes(self.partition_logs_by_resource_attributes)
.with_custom_config(self.custom_config.clone());
.with_partition_logs_by_resource_attributes(self.partition_logs_by_resource_attributes);

// Parse custom config from Option<String> to Vec<(String, String)>
let custom_config = match &self.custom_config {
Some(s) => crate::init::parse::parse_key_vals::<String, String>(s)?,
None => Vec::new(),
};
config = config.with_custom_config(custom_config);

// Configure SASL if credentials are provided
if let (Some(username), Some(password), Some(mechanism)) = (
Expand All @@ -464,7 +467,7 @@ impl KafkaExporterArgs {
config.security_protocol = Some(self.security_protocol.into());
}

config
Ok(config)
}
}

Expand Down
25 changes: 14 additions & 11 deletions src/init/kafka_receiver.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// SPDX-License-Identifier: Apache-2.0

use crate::init::parse::parse_key_val;
use crate::receivers::kafka::config::KafkaReceiverConfig;
use clap::Args;
use serde::Deserialize;
use tower::BoxError;

#[derive(Default, Debug, Args, Clone, Deserialize)]
#[serde(default)]
Expand Down Expand Up @@ -252,7 +252,7 @@ pub struct KafkaReceiverArgs {
#[arg(
id("KAFKA_RECEIVER_SECURITY_PROTOCOL"),
long("kafka-receiver-security-protocol"),
env = "ROTEL_KAFKA_RECEIVER_SECURITY_PROTOCOL",
env = "ROTEL_KAFKA_RECEIVER_SECURITY_PROTOCOL",
default_value = None,
)]
pub security_protocol: Option<crate::receivers::kafka::config::SecurityProtocol>,
Expand Down Expand Up @@ -294,16 +294,13 @@ pub struct KafkaReceiverArgs {
#[arg(
id("KAFKA_RECEIVER_CUSTOM_CONFIG"),
long("kafka-receiver-custom-config"),
env = "ROTEL_KAFKA_RECEIVER_CUSTOM_CONFIG",
value_parser = parse_key_val::<String, String>,
value_delimiter = ','
env = "ROTEL_KAFKA_RECEIVER_CUSTOM_CONFIG"
)]
#[serde(deserialize_with = "crate::init::parse::deserialize_key_value_pairs")]
pub custom_config: Vec<(String, String)>,
pub custom_config: Option<String>,
}

impl KafkaReceiverArgs {
pub fn build_config(&self) -> KafkaReceiverConfig {
pub fn build_config(&self) -> Result<KafkaReceiverConfig, BoxError> {
let mut config = KafkaReceiverConfig::new(self.brokers.clone(), self.group_id.clone())
.with_traces(self.traces)
.with_metrics(self.metrics)
Expand All @@ -320,8 +317,14 @@ impl KafkaReceiverArgs {
self.fetch_max_wait_ms,
self.max_partition_fetch_bytes,
)
.with_isolation_level(self.isolation_level)
.with_custom_config(self.custom_config.clone());
.with_isolation_level(self.isolation_level);

// Parse custom config from Option<String> to Vec<(String, String)>
let custom_config = match &self.custom_config {
Some(s) => crate::init::parse::parse_key_vals::<String, String>(s)?,
None => Vec::new(),
};
config = config.with_custom_config(custom_config);

// Set topics if provided
if let Some(ref topic) = self.traces_topic {
Expand Down Expand Up @@ -363,7 +366,7 @@ impl KafkaReceiverArgs {
self.ssl_key_password.clone(),
);

config
Ok(config)
}
}

Expand Down
Loading