From d940e367795c9a3dda2e5c085f0d9c651aa85f2a Mon Sep 17 00:00:00 2001 From: abrahamanavhoeba-alt Date: Wed, 17 Dec 2025 05:11:34 -0500 Subject: [PATCH 1/3] fix(apiserver): filter invalid IPs in node lookup [#391] - Modified find_node_by_simple_key() to skip invalid IPs (0.0.0.0, 127.0.0.1, empty) - Iterates through all nodes to find first valid IP instead of just returning first found - Added new find_node_by_target_ip() function for specific IP lookups - Prevents node registration from returning wrong/unusable IP addresses --- src/server/apiserver/src/node/node_lookup.rs | 43 +++++++++++++++++++- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/src/server/apiserver/src/node/node_lookup.rs b/src/server/apiserver/src/node/node_lookup.rs index 762652a3..604becf0 100644 --- a/src/server/apiserver/src/node/node_lookup.rs +++ b/src/server/apiserver/src/node/node_lookup.rs @@ -12,17 +12,30 @@ use prost::Message; use std::error::Error; /// Find a node by IP address from simplified node keys +/// Filters out invalid IPs like 0.0.0.0 and empty strings pub async fn find_node_by_simple_key() -> Option { println!("Checking simplified node keys in etcd..."); match etcd::get_all_with_prefix("nodes/").await { Ok(kvs) => { println!("Found {} simplified node keys", kvs.len()); - if let Some(kv) = kvs.first() { + // Iterate through all nodes and find the first valid IP + for kv in kvs { println!("Node key: {}", kv.key); let ip_address = kv.key.trim_start_matches("nodes/"); - println!("Found node IP directly from key: {}", ip_address); + + // Skip invalid IPs: empty, 0.0.0.0, or localhost when looking for remote nodes + if ip_address.is_empty() + || ip_address == "0.0.0.0" + || ip_address == "127.0.0.1" + { + println!("Skipping invalid IP: {}", ip_address); + continue; + } + + println!("Found valid node IP from key: {}", ip_address); return Some(ip_address.to_string()); } + println!("No valid node IPs found in simplified keys"); None } Err(e) => { @@ -32,6 +45,32 @@ pub async fn find_node_by_simple_key() -> Option { } } +/// Find a node by specific target IP address from simplified node keys +/// Returns the IP if it exists and is valid, None otherwise +pub async fn find_node_by_target_ip(target_ip: &str) -> Option { + if target_ip.is_empty() || target_ip == "0.0.0.0" { + return None; + } + + println!("Looking for specific node IP: {}", target_ip); + let key = format!("nodes/{}", target_ip); + + match etcd::get(&key).await { + Ok(Some(_)) => { + println!("Found target node IP in etcd: {}", target_ip); + Some(target_ip.to_string()) + } + Ok(None) => { + println!("Target node IP not found in etcd: {}", target_ip); + None + } + Err(e) => { + println!("Error checking for target IP {}: {}", target_ip, e); + None + } + } +} + /// Find a node directly from etcd using cluster/nodes/ prefix pub async fn find_node_from_etcd() -> Option { println!("Checking cluster/nodes/ prefix in etcd..."); From 04c658627d6f04751e538c9b5cd2555f86350c32 Mon Sep 17 00:00:00 2001 From: abrahamanavhoeba-alt Date: Wed, 17 Dec 2025 05:12:05 -0500 Subject: [PATCH 2/3] fix(filtergateway): skip DDS subscription when no conditions [#389] - Check if scenario has valid conditions before subscribing to DDS topic - Skip subscription when topic name is empty to prevent dummy data - Added logging for skipped subscriptions for debugging - Fixes issue where empty conditions caused unwanted DDS data publishing --- src/player/filtergateway/src/manager.rs | 87 +++++++++++++++---------- 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/src/player/filtergateway/src/manager.rs b/src/player/filtergateway/src/manager.rs index da33b5d3..7eaa2702 100644 --- a/src/player/filtergateway/src/manager.rs +++ b/src/player/filtergateway/src/manager.rs @@ -90,23 +90,34 @@ impl FilterGatewayManager { for scenario in etcd_scenario { let scenario: Scenario = serde_yaml::from_str(&scenario)?; println!("Scenario: {:?}", scenario); - let topic_name = scenario - .get_conditions() - .as_ref() - .map(|cond| cond.get_operand_value()) - .unwrap_or_default(); - let data_type_name = scenario - .get_conditions() - .as_ref() - .map(|cond| cond.get_operand_value()) - .unwrap_or_default(); - let mut vehicle_manager = self.vehicle_manager.lock().await; - if let Err(e) = vehicle_manager - .subscribe_topic(topic_name, data_type_name) - .await - { - eprintln!("Error subscribing to vehicle data: {:?}", e); + + // Only subscribe to vehicle data if the scenario has conditions with valid topic + if let Some(conditions) = scenario.get_conditions().as_ref() { + let topic_name = conditions.get_operand_value(); + let data_type_name = conditions.get_operand_value(); + + // Skip subscription if topic name is empty + if !topic_name.is_empty() { + let mut vehicle_manager = self.vehicle_manager.lock().await; + if let Err(e) = vehicle_manager + .subscribe_topic(topic_name, data_type_name) + .await + { + eprintln!("Error subscribing to vehicle data: {:?}", e); + } + } else { + println!( + "Skipping DDS subscription for scenario '{}': empty topic name", + scenario.get_name() + ); + } + } else { + println!( + "Scenario '{}' has no conditions, skipping DDS subscription", + scenario.get_name() + ); } + self.launch_scenario_filter(scenario).await?; } @@ -185,25 +196,31 @@ impl FilterGatewayManager { match param.action { 0 => { // Allow - // Subscribe to vehicle data - let topic_name = param - .scenario - .get_conditions() - .as_ref() - .map(|cond| cond.get_operand_value()) - .unwrap_or_default(); - let data_type_name = param - .scenario - .get_conditions() - .as_ref() - .map(|cond| cond.get_operand_value()) - .unwrap_or_default(); - let mut vehicle_manager = self.vehicle_manager.lock().await; - if let Err(e) = vehicle_manager - .subscribe_topic(topic_name, data_type_name) - .await - { - eprintln!("Error subscribing to vehicle data: {:?}", e); + // Subscribe to vehicle data only if conditions exist and topic is valid + if let Some(conditions) = param.scenario.get_conditions().as_ref() { + let topic_name = conditions.get_operand_value(); + let data_type_name = conditions.get_operand_value(); + + // Only subscribe if topic name is not empty + if !topic_name.is_empty() { + let mut vehicle_manager = self.vehicle_manager.lock().await; + if let Err(e) = vehicle_manager + .subscribe_topic(topic_name, data_type_name) + .await + { + eprintln!("Error subscribing to vehicle data: {:?}", e); + } + } else { + println!( + "Skipping DDS subscription for scenario '{}': empty topic name", + param.scenario.get_name() + ); + } + } else { + println!( + "Scenario '{}' has no conditions, skipping DDS subscription", + param.scenario.get_name() + ); } self.launch_scenario_filter(param.scenario).await?; } From 5574849ed864ce110e1af5efa9b182c3cd096efc Mon Sep 17 00:00:00 2001 From: abrahamanavhoeba-alt Date: Wed, 17 Dec 2025 05:12:31 -0500 Subject: [PATCH 3/3] fix(apiserver): process all scenarios from YAML file [#386] - Changed apply() to return Vec instead of single String - Collects all scenarios from multi-document YAML, not just the last one - Updated apply_artifact() to send all scenarios to FilterGateway - Updated tests to handle new return type - Fixes issue where only last scenario in YAML file was being processed --- src/server/apiserver/src/artifact/mod.rs | 44 +++++++++++++++++------- src/server/apiserver/src/manager.rs | 44 +++++++++++++++--------- 2 files changed, 59 insertions(+), 29 deletions(-) diff --git a/src/server/apiserver/src/artifact/mod.rs b/src/server/apiserver/src/artifact/mod.rs index fea28f09..6b47ad04 100644 --- a/src/server/apiserver/src/artifact/mod.rs +++ b/src/server/apiserver/src/artifact/mod.rs @@ -20,20 +20,32 @@ use common::spec::artifact::Volume; /// ### Parametets /// * `body: &str` - whole yaml string of piccolo artifact /// ### Returns -/// * `Result(String, String)` - scenario and package yaml in downloaded artifact +/// * `Result(Vec)` - vector of scenario yaml strings in downloaded artifact /// ### Description /// Write artifact in etcd -pub async fn apply(body: &str) -> common::Result { +/// Note: Returns ALL scenarios from the YAML, not just the last one +pub async fn apply(body: &str) -> common::Result> { use std::time::Instant; let total_start = Instant::now(); let docs: Vec<&str> = body.split("---").collect(); - let mut scenario_str = String::new(); - let mut package_str = String::new(); + let mut scenario_strs: Vec = Vec::new(); + let mut has_package = false; for doc in docs { + // Skip empty documents + if doc.trim().is_empty() { + continue; + } + let parse_start = Instant::now(); - let value: serde_yaml::Value = serde_yaml::from_str(doc)?; + let value: serde_yaml::Value = match serde_yaml::from_str(doc) { + Ok(v) => v, + Err(e) => { + println!("apply: failed to parse YAML document: {:?}", e); + continue; + } + }; let artifact_str = serde_yaml::to_string(&value)?; let parse_elapsed = parse_start.elapsed(); println!("apply: YAML parse elapsed = {:?}", parse_elapsed); @@ -60,7 +72,8 @@ pub async fn apply(body: &str) -> common::Result { match kind { "Scenario" => { - scenario_str = artifact_str; + // Collect ALL scenarios, not just the last one + scenario_strs.push(artifact_str); // Set initial scenario state to idle via StateManager println!("🔄 SCENARIO STATE INITIALIZATION: ApiServer Setting Initial State"); @@ -98,7 +111,7 @@ pub async fn apply(body: &str) -> common::Result { println!(" ✅ Successfully set scenario {} to idle state", name); } } - "Package" => package_str = artifact_str, + "Package" => has_package = true, _ => continue, }; } @@ -106,13 +119,14 @@ pub async fn apply(body: &str) -> common::Result { let total_elapsed = total_start.elapsed(); println!("apply: total elapsed = {:?}", total_elapsed); + println!("apply: found {} scenarios", scenario_strs.len()); - if scenario_str.is_empty() { + if scenario_strs.is_empty() { Err("There is not any scenario in yaml string".into()) - } else if package_str.is_empty() { + } else if !has_package { Err("There is not any package in yaml string".into()) } else { - Ok(scenario_str) + Ok(scenario_strs) } } @@ -241,9 +255,13 @@ spec: result.err() ); - // Assert: scenario and package strings should not be empty - let scenario = result.unwrap(); - assert!(!scenario.is_empty(), "Scenario YAML should not be empty"); + // Assert: scenarios vector should not be empty + let scenarios = result.unwrap(); + assert!(!scenarios.is_empty(), "Scenarios vector should not be empty"); + assert!( + !scenarios[0].is_empty(), + "First scenario YAML should not be empty" + ); } /// Test apply() with missing `action` field (invalid Scenario) diff --git a/src/server/apiserver/src/manager.rs b/src/server/apiserver/src/manager.rs index 5723136e..f1cf2cf8 100644 --- a/src/server/apiserver/src/manager.rs +++ b/src/server/apiserver/src/manager.rs @@ -134,9 +134,9 @@ async fn reload() { /// ### Description /// write artifact in etcd /// (optional) make yaml, kube files for Bluechi -/// send a gRPC message to gateway +/// send a gRPC message to gateway for ALL scenarios in the YAML pub async fn apply_artifact(body: &str) -> common::Result<()> { - let scenario = crate::artifact::apply(body).await?; + let scenarios = crate::artifact::apply(body).await?; let handle_yaml = HandleYamlRequest { yaml: body.to_string(), @@ -246,11 +246,21 @@ pub async fn apply_artifact(body: &str) -> common::Result<()> { } } - let req: HandleScenarioRequest = HandleScenarioRequest { - action: Action::Apply.into(), - scenario, - }; - crate::grpc::sender::filtergateway::send(req).await?; + // Send ALL scenarios to filtergateway, not just the last one + println!( + "apply_artifact: Sending {} scenarios to FilterGateway", + scenarios.len() + ); + for scenario in scenarios { + let req: HandleScenarioRequest = HandleScenarioRequest { + action: Action::Apply.into(), + scenario, + }; + if let Err(e) = crate::grpc::sender::filtergateway::send(req).await { + eprintln!("Error sending scenario to FilterGateway: {:?}", e); + } + } + Ok(()) } @@ -774,21 +784,23 @@ spec: } /// Mocked version of apply_artifact function. - /// Instead of full production logic, this sends a gRPC request to the mock server. + /// Instead of full production logic, this sends gRPC requests for ALL scenarios to the mock server. async fn apply_artifact( body: &str, grpc_addr: SocketAddr, ) -> Result<(), Box> { - let scenario = crate::artifact::apply(body).await?; + let scenarios = crate::artifact::apply(body).await?; - // Prepare the gRPC request with Apply action - let req = HandleScenarioRequest { - action: Action::Apply.into(), - scenario, - }; + // Send ALL scenarios to the mock gRPC server + for scenario in scenarios { + let req = HandleScenarioRequest { + action: Action::Apply.into(), + scenario, + }; + mock_send(req, grpc_addr).await?; + } - // Send request to the mock gRPC server - mock_send(req, grpc_addr).await + Ok(()) } /// Mocked version of withdraw_artifact function.