Skip to content

Commit 00e1f80

Browse files
committed
rust: bring back pipeline looping
In case of MQTT disconnection threads end and pipeline restarts creating a new connection and restarting the workers Fixes Orange-OpenSource#328 Signed-off-by: Nicolas Buffon <nicolas.buffon@orange.com>
1 parent 2cac309 commit 00e1f80

1 file changed

Lines changed: 81 additions & 78 deletions

File tree

rust/src/client/application/pipeline.rs

Lines changed: 81 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -90,90 +90,93 @@ pub async fn run<A, C, T>(
9090
}
9191
info!("Analysis thread count set to: {}", thread_count);
9292

93-
let (mut mqtt_client, event_loop) = MqttClient::new(&configuration.mqtt);
94-
mqtt_client_subscribe(subscription_list, &mut mqtt_client).await;
95-
96-
let (event_receiver, mqtt_client_listen_handle) = mqtt_client_listen_thread(event_loop);
97-
let (item_receiver, monitoring_receiver, information_receiver, mqtt_router_dispatch_handle) =
98-
mqtt_router_dispatch_thread(subscription_list.to_vec(), event_receiver);
99-
100-
let monitor_reception_handle = monitor_thread(
101-
"received_on".to_string(),
102-
configuration.clone(),
103-
monitoring_receiver,
104-
);
105-
106-
let analysis_pool = threadpool::ThreadPool::with_name("Analysis".to_string(), thread_count);
107-
108-
let (analyser_sender, analyser_receiver) = unbounded();
109-
for _ in 0..thread_count {
110-
let rx = item_receiver.clone();
111-
let tx = analyser_sender.clone();
112-
let configuration_clone = configuration.clone();
113-
let context_clone = context.clone();
114-
let seq_num_clone = sequence_number.clone();
115-
analysis_pool.execute(move || {
116-
info!("Starting analyser generation...");
117-
trace!("Analyser generation closure entering...");
118-
let mut analyser = A::new(configuration_clone, context_clone, seq_num_clone);
119-
loop {
120-
match rx.recv() {
121-
Ok(item) => {
122-
for publish_item in analyser.analyze(item.clone()) {
123-
let cause = Cause::from_exchange(&(item.payload));
124-
match tx.send((publish_item, cause)) {
125-
Ok(()) => trace!("Analyser sent"),
126-
Err(error) => {
127-
error!("Stopped to send analyser: {}", error);
128-
break;
93+
loop {
94+
let configuration = configuration.clone();
95+
let (mut mqtt_client, event_loop) = MqttClient::new(&configuration.mqtt);
96+
mqtt_client_subscribe(subscription_list, &mut mqtt_client).await;
97+
98+
let (event_receiver, mqtt_client_listen_handle) = mqtt_client_listen_thread(event_loop);
99+
let (item_receiver, monitoring_receiver, information_receiver, mqtt_router_dispatch_handle) =
100+
mqtt_router_dispatch_thread(subscription_list.to_vec(), event_receiver);
101+
102+
let monitor_reception_handle = monitor_thread(
103+
"received_on".to_string(),
104+
configuration.clone(),
105+
monitoring_receiver,
106+
);
107+
108+
let analysis_pool = threadpool::ThreadPool::with_name("Analysis".to_string(), thread_count);
109+
110+
let (analyser_sender, analyser_receiver) = unbounded();
111+
for _ in 0..thread_count {
112+
let rx = item_receiver.clone();
113+
let tx = analyser_sender.clone();
114+
let configuration_clone = configuration.clone();
115+
let context_clone = context.clone();
116+
let seq_num_clone = sequence_number.clone();
117+
analysis_pool.execute(move || {
118+
info!("Starting analyser generation...");
119+
trace!("Analyser generation closure entering...");
120+
let mut analyser = A::new(configuration_clone, context_clone, seq_num_clone);
121+
loop {
122+
match rx.recv() {
123+
Ok(item) => {
124+
for publish_item in analyser.analyze(item.clone()) {
125+
let cause = Cause::from_exchange(&(item.payload));
126+
match tx.send((publish_item, cause)) {
127+
Ok(()) => trace!("Analyser sent"),
128+
Err(error) => {
129+
error!("Stopped to send analyser: {}", error);
130+
break;
131+
}
129132
}
130133
}
134+
trace!("Analyser generation closure finished");
135+
break;
136+
}
137+
Err(recv_error) => {
138+
info!("Exiting analysis thread: {}", recv_error);
139+
break;
131140
}
132-
trace!("Analyser generation closure finished");
133-
break;
134-
}
135-
Err(recv_error) => {
136-
info!("Exiting analysis thread: {}", recv_error);
137-
break;
138141
}
139142
}
140-
}
141-
});
143+
});
144+
}
145+
// Drop the original sender as only clones in threads remains
146+
drop(analyser_sender);
147+
148+
let (publish_item_receiver, publish_monitoring_receiver, filter_handle) =
149+
filter_thread::<T>(configuration.clone(), analyser_receiver);
150+
151+
let reader_configure_handle =
152+
reader_configure_thread(configuration.clone(), information_receiver);
153+
154+
let monitor_publish_handle = monitor_thread(
155+
"sent_on".to_string(),
156+
configuration,
157+
publish_monitoring_receiver,
158+
);
159+
160+
mqtt_client_publish(publish_item_receiver, &mut mqtt_client).await;
161+
162+
debug!("Start mqtt_client_listen_handler joining...");
163+
mqtt_client_listen_handle.await.unwrap();
164+
debug!("Start mqtt_router_dispatch_handler joining...");
165+
mqtt_router_dispatch_handle.join().unwrap();
166+
debug!("Start monitor_reception_handle joining...");
167+
monitor_reception_handle.join().unwrap();
168+
debug!("Start reader_configure_handler joining...");
169+
reader_configure_handle.join().unwrap();
170+
debug!("Start analyser_generate_handler joining...");
171+
analysis_pool.join();
172+
debug!("Start filter_handle joining...");
173+
filter_handle.join().unwrap();
174+
debug!("Start monitor_publish_handle joining...");
175+
monitor_publish_handle.join().unwrap();
176+
177+
warn!("Loop done");
178+
tokio::time::sleep(Duration::from_secs(5)).await;
142179
}
143-
// Drop the original sender as only clones in threads remains
144-
drop(analyser_sender);
145-
146-
let (publish_item_receiver, publish_monitoring_receiver, filter_handle) =
147-
filter_thread::<T>(configuration.clone(), analyser_receiver);
148-
149-
let reader_configure_handle =
150-
reader_configure_thread(configuration.clone(), information_receiver);
151-
152-
let monitor_publish_handle = monitor_thread(
153-
"sent_on".to_string(),
154-
configuration,
155-
publish_monitoring_receiver,
156-
);
157-
158-
mqtt_client_publish(publish_item_receiver, &mut mqtt_client).await;
159-
160-
debug!("Start mqtt_client_listen_handler joining...");
161-
mqtt_client_listen_handle.await.unwrap();
162-
debug!("Start mqtt_router_dispatch_handler joining...");
163-
mqtt_router_dispatch_handle.join().unwrap();
164-
debug!("Start monitor_reception_handle joining...");
165-
monitor_reception_handle.join().unwrap();
166-
debug!("Start reader_configure_handler joining...");
167-
reader_configure_handle.join().unwrap();
168-
debug!("Start analyser_generate_handler joining...");
169-
analysis_pool.join();
170-
debug!("Start filter_handle joining...");
171-
filter_handle.join().unwrap();
172-
debug!("Start monitor_publish_handle joining...");
173-
monitor_publish_handle.join().unwrap();
174-
175-
warn!("Loop done");
176-
tokio::time::sleep(Duration::from_secs(5)).await;
177180
}
178181

179182
fn filter_thread<T>(

0 commit comments

Comments
 (0)