Skip to content

Commit b2f1630

Browse files
committed
feat: made generic delegate for easy implementation of new handlers
1 parent 611d8e7 commit b2f1630

File tree

2 files changed

+100
-22
lines changed

2 files changed

+100
-22
lines changed

src/flow_queue/delegate.rs

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,68 @@
1-
use lapin::message::{DeliveryResult};
1+
use lapin::message::{Delivery, DeliveryResult};
22
use lapin::ConsumerDelegate;
3+
use log::debug;
34
use std::future::Future;
45
use std::pin::Pin;
56

6-
struct QueueDelegate;
7+
/// Delegate trait to implement.
8+
///
9+
/// Use as delegate for RabbitMQ
10+
///
11+
/// # Example
12+
/// ```
13+
/// use lapin::message::Delivery;
14+
/// use code0_flow::flow_queue::delegate::Delegate;
15+
///
16+
/// struct HttpDelegate;
17+
///
18+
/// impl Delegate for HttpDelegate {
19+
/// fn handle_delivery(&self, delivery: Delivery) {
20+
/// todo!("Handle delivery!")
21+
/// }
22+
/// }
23+
/// ```
24+
pub trait Delegate {
25+
fn handle_delivery(&self, delivery: Delivery);
26+
}
27+
28+
pub struct QueueDelegate<T: Delegate> {
29+
pub delegate: T,
30+
}
31+
32+
impl<T: Delegate> QueueDelegate<T> {
33+
pub fn new(delegate: T) -> Self {
34+
QueueDelegate { delegate }
35+
}
36+
37+
pub fn deliver(&self, delivery: Delivery) {
38+
self.delegate.handle_delivery(delivery);
39+
}
40+
}
741

8-
impl ConsumerDelegate for QueueDelegate {
42+
impl<T: Delegate> ConsumerDelegate for QueueDelegate<T> {
943
fn on_new_delivery(
1044
&self,
1145
delivery: DeliveryResult,
1246
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
13-
let optional_delivery = match delivery {
14-
Ok(option) => option,
15-
Err(error) => {
16-
todo!("error handling")
17-
}
18-
};
47+
async move {
48+
let optional_delivery = match delivery {
49+
Ok(option) => option,
50+
Err(_) => return,
51+
};
52+
let delivery = match optional_delivery {
53+
Some(del) => del,
54+
None => return,
55+
};
1956

20-
let delivery = match optional_delivery {
21-
Some(del) => del,
22-
None => {
23-
todo!("error handling")
24-
}
25-
};
26-
todo!("consumer shoud consume the data of delivy as &Vec<u8>")
57+
self.delegate.handle_delivery(delivery);
58+
}
2759
}
2860

2961
fn drop_prefetched_messages(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
30-
todo!("")
62+
let future = async move {
63+
debug!("Dropping prefetched messages...");
64+
};
65+
66+
Box::pin(future)
3167
}
3268
}

src/flow_queue/handler.rs

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::flow_queue::connection::{get_flow_channel, FlowChannel, FlowQueue};
2+
use crate::flow_queue::delegate::{Delegate, QueueDelegate};
23
use crate::flow_queue::name::{QueueName, QueuePrefix, QueueProtocol};
34
use lapin::message::{Delivery, DeliveryResult};
45
use lapin::options::{BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions};
@@ -36,6 +37,7 @@ pub async fn declare_queues(flow_channel: FlowChannel, names: Vec<QueueName>) {
3637
}
3738
}
3839

40+
/// Sends a message into a queue
3941
pub async fn send_message(
4042
flow_channel: FlowChannel,
4143
queue_name: QueueName,
@@ -60,19 +62,59 @@ pub async fn send_message(
6062
}
6163
}
6264

63-
pub async fn consume_message(channel: FlowChannel, queue_protocol: QueueProtocol) {
64-
let name = QueuePrefix::Send + queue_protocol;
65+
/// Consumes a message
66+
///
67+
/// Creates a delegate that waits on messages and consumes them.
68+
///
69+
/// # Params
70+
/// - channel: FlowChannel of the send message
71+
/// - queue_name: Name of the Queue that should be listened to
72+
/// - delegate: Consumer delegate of the message
73+
///
74+
/// # Example
75+
/// ```
76+
/// use lapin::message::Delivery;
77+
/// use code0_flow::flow_queue::delegate::Delegate;
78+
/// use code0_flow::flow_queue::connection::get_flow_channel;
79+
/// use code0_flow::flow_queue::name::{QueueName, QueuePrefix, QueueProtocol};
80+
/// use code0_flow::flow_queue::handler::consume_message;
81+
///
82+
/// struct HttpDelegate;
83+
///
84+
/// impl Delegate for HttpDelegate {
85+
/// fn handle_delivery(&self, delivery: Delivery) {
86+
/// todo!("Handle delivery!")
87+
/// }
88+
/// }
89+
///
90+
/// async fn main() {
91+
/// let uri = "abc";
92+
/// let channel = get_flow_channel(uri).await;
93+
/// let queue_name = QueueName {
94+
/// prefix: QueuePrefix::Send,
95+
/// protocol: QueueProtocol::Rest,
96+
/// };
97+
///
98+
/// consume_message(channel, queue_name, HttpDelegate).await;
99+
/// }
100+
/// ```
101+
pub async fn consume_message<T: Delegate>(
102+
channel: FlowChannel,
103+
queue_name: QueueName,
104+
delegate: T,
105+
) {
106+
let name = queue_name.prefix + queue_name.protocol;
65107
let channel_arc = channel.lock().await;
66108

67109
let mut consumer = channel_arc
68110
.basic_consume(
69111
&*name,
70-
"my_consumer",
112+
"",
71113
BasicConsumeOptions::default(),
72114
FieldTable::default(),
73115
)
74116
.await
75117
.unwrap();
76118

77-
consumer.set_delegate(SendQueueDelegate);
78-
}
119+
consumer.set_delegate(QueueDelegate { delegate });
120+
}

0 commit comments

Comments
 (0)