Skip to content

Commit febd0bf

Browse files
committed
WIP
Signed-off-by: Nicolas Buffon <nicolas.buffon@orange.com>
1 parent a962a88 commit febd0bf

2 files changed

Lines changed: 17 additions & 15 deletions

File tree

rust/src/client/application/pipeline.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,9 @@ where
362362
for topic in topic_list.iter() {
363363
match topic {
364364
info_topic if info_topic.to_string().contains(Information::TYPE) => {
365-
router.add_route(info_topic.clone(), deserialize::<Information>);
365+
router.add_route(info_topic.clone(), deserialize::<T, Information>);
366366
}
367-
_ => router.add_route(topic.clone(), deserialize::<Exchange>),
367+
_ => router.add_route(topic.clone(), deserialize::<T, Exchange>),
368368
}
369369
}
370370

@@ -424,15 +424,15 @@ where
424424
)
425425
}
426426

427-
fn deserialize<T>(publish: rumqttc::v5::mqttbytes::v5::Publish) -> Option<BoxedReception>
427+
fn deserialize<T, P>(publish: rumqttc::v5::mqttbytes::v5::Publish) -> Option<Packet<T, P>>
428428
where
429-
T: DeserializeOwned + Payload + 'static + Send,
429+
T: Topic + 'static,
430+
P: DeserializeOwned + Payload + 'static + Send,
430431
{
431-
// Incoming publish from the broker
432432
match String::from_utf8(publish.payload.to_vec()) {
433433
Ok(message) => {
434434
let message_str = message.as_str();
435-
match serde_json::from_str::<T>(message_str) {
435+
match serde_json::from_str::<P>(message_str) {
436436
Ok(message) => {
437437
trace!("message parsed");
438438
return Some((Box::new(message), publish.properties.unwrap_or_default()));

rust/src/transport/mqtt/mqtt_router.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,32 @@ use rumqttc::v5::{Event, Incoming};
1818
use crate::transport::mqtt::topic::Topic;
1919
use std::any::{type_name, Any};
2020
use std::str::from_utf8;
21+
use crate::transport::packet::Packet;
22+
use crate::transport::payload::Payload;
2123

22-
pub type BoxedReception = (Box<dyn Any + 'static + Send>, PublishProperties);
24+
// pub type BoxedReception = (Box<dyn Any + 'static + Send>, PublishProperties);
2325

24-
type BoxedCallback = Box<dyn Fn(Publish) -> Option<BoxedReception>>;
26+
// type BoxedCallback = Box<dyn Fn(Publish) -> Option<Packet<dyn Topic, dyn Payload>>>;
2527

2628
#[cfg(feature = "telemetry")]
2729
use crate::transport::telemetry::get_reception_mqtt_span;
2830

2931
#[derive(Default)]
30-
pub struct MqttRouter {
31-
route_map: HashMap<String, BoxedCallback>,
32+
pub struct MqttRouter<T: Topic> {
33+
route_map: HashMap<String, Box<dyn Fn(Publish) -> Option<Packet<T, dyn Any + Send + 'static>>>>,
3234
}
3335

34-
impl MqttRouter {
35-
pub fn add_route<T, C>(&mut self, topic: T, callback: C)
36+
impl<T> MqttRouter<T> {
37+
pub fn add_route<P, C>(&mut self, topic: T, callback: C)
3638
where
37-
T: Topic,
38-
C: Fn(Publish) -> Option<BoxedReception> + 'static,
39+
P: Payload,
40+
C: Fn(Publish) -> Option<Packet<T, P>> + 'static,
3941
{
4042
self.route_map.insert(topic.as_route(), Box::new(callback));
4143
info!("Registered route for topic: {}", topic.as_route());
4244
}
4345

44-
pub fn handle_event<T: Topic>(&mut self, event: Event) -> Option<(T, BoxedReception)> {
46+
pub fn handle_event<P: Payload>(&mut self, event: Event) -> Option<Packet<T, P>> {
4547
match event {
4648
Event::Incoming(incoming) => match incoming {
4749
Incoming::Publish(publish) => {

0 commit comments

Comments
 (0)