Skip to content

Commit 3b22350

Browse files
committed
feat: added queue declaration & sending messages into flow channels
1 parent 2b32f8c commit 3b22350

File tree

1 file changed

+109
-0
lines changed

1 file changed

+109
-0
lines changed

src/flow_queue/handler.rs

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
use crate::flow_queue::connection::{get_flow_channel, FlowChannel, FlowQueue};
2+
use crate::flow_queue::name::{QueueName, QueuePrefix, QueueProtocol};
3+
use lapin::message::{Delivery, DeliveryResult};
4+
use lapin::options::{BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions};
5+
use lapin::protocol::basic::gen_return;
6+
use lapin::types::FieldTable;
7+
use lapin::{ConsumerDelegate, Error};
8+
use log::{debug, error, info};
9+
use std::future::Future;
10+
use std::pin::Pin;
11+
12+
/// # Declares all given queues
13+
///
14+
/// ## Expected behavior
15+
/// If a queue cannot be created, the services stops
16+
pub async fn declare_queues(flow_channel: FlowChannel, names: Vec<QueueName>) {
17+
let channel_arc = flow_channel.lock().await;
18+
for name in names {
19+
let channel_name = name.prefix + name.protocol;
20+
let queue_result = channel_arc
21+
.queue_declare(
22+
&*channel_name,
23+
QueueDeclareOptions::default(),
24+
FieldTable::default(),
25+
)
26+
.await;
27+
28+
match queue_result {
29+
Ok(_) => {
30+
info!("Declared queue: {}", channel_name)
31+
}
32+
Err(error) => {
33+
let str = format!("Cannot declare queue: {}, Reason: {}", channel_name, error);
34+
error!(str);
35+
panic!(str)
36+
}
37+
};
38+
}
39+
}
40+
41+
pub async fn send_message(
42+
flow_channel: FlowChannel,
43+
queue_name: QueueName,
44+
payload: &Vec<u8>,
45+
) -> Result<bool, Error> {
46+
let channel_arc = flow_channel.lock().await;
47+
let name = queue_name.prefix + queue_name.protocol;
48+
49+
let result = channel_arc
50+
.basic_publish(
51+
"",
52+
&*name,
53+
BasicPublishOptions::default(),
54+
payload,
55+
Default::default(),
56+
)
57+
.await;
58+
59+
match result {
60+
Ok(_) => Ok(true),
61+
Err(error) => Err(error),
62+
}
63+
}
64+
65+
pub async fn consume_message(channel: FlowChannel, queue_protocol: QueueProtocol) {
66+
let name = QueuePrefix::Send + queue_protocol;
67+
let channel_arc = channel.lock().await;
68+
69+
let mut consumer = channel_arc
70+
.basic_consume(
71+
&*name,
72+
"my_consumer",
73+
BasicConsumeOptions::default(),
74+
FieldTable::default(),
75+
)
76+
.await
77+
.unwrap();
78+
79+
consumer.set_delegate(SendQueueDelegate);
80+
}
81+
82+
struct SendQueueDelegate;
83+
84+
impl ConsumerDelegate for SendQueueDelegate {
85+
fn on_new_delivery(
86+
&self,
87+
delivery: DeliveryResult,
88+
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
89+
let optional_delivery = match delivery {
90+
Ok(option) => option,
91+
Err(error) => {
92+
todo!("")
93+
}
94+
};
95+
96+
let delivery = match optional_delivery {
97+
Some(del) => del,
98+
None => {
99+
todo!("")
100+
}
101+
};
102+
103+
todo!()
104+
}
105+
106+
fn drop_prefetched_messages(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
107+
todo!()
108+
}
109+
}

0 commit comments

Comments
 (0)