Skip to content

Commit ef00f3a

Browse files
committed
fix: added function as a consumer for a message
1 parent 46495b3 commit ef00f3a

File tree

1 file changed

+50
-25
lines changed

1 file changed

+50
-25
lines changed

src/flow_queue/service.rs

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -188,19 +188,19 @@ impl RabbitmqClient {
188188
}
189189

190190
// Function intended to get used by the runtime
191-
pub async fn consume_message(
191+
pub async fn receive_messages(
192192
&self,
193193
queue_name: &str,
194-
ack_on_success: bool,
195-
) -> Result<Message, RabbitMqError> {
194+
handle_message: fn(Message) -> Result<Message, lapin::Error>,
195+
) -> Result<(), lapin::Error> {
196196
let mut consumer = {
197197
let channel = self.channel.lock().await;
198198

199199
let consumer_res = channel
200200
.basic_consume(
201201
queue_name,
202202
"consumer",
203-
lapin::options::BasicConsumeOptions::default(),
203+
BasicConsumeOptions::default(),
204204
FieldTable::default(),
205205
)
206206
.await;
@@ -211,42 +211,67 @@ impl RabbitmqClient {
211211
}
212212
};
213213

214-
debug!("Starting to consume from {}", queue_name);
214+
println!("Starting to consume from {}", queue_name);
215215

216-
while let Some(delivery_result) = consumer.next().await {
217-
let delivery = match delivery_result {
216+
while let Some(delivery) = consumer.next().await {
217+
let delivery = match delivery {
218218
Ok(del) => del,
219-
Err(_) => return Err(RabbitMqError::DeserializationError),
219+
Err(err) => {
220+
println!("Error receiving message: {}", err);
221+
return Err(err);
222+
}
220223
};
224+
221225
let data = &delivery.data;
222226
let message_str = match std::str::from_utf8(&data) {
223-
Ok(str) => str,
224-
Err(_) => {
225-
return Err(RabbitMqError::DeserializationError);
227+
Ok(str) => {
228+
println!("Received message: {}", str);
229+
str
230+
}
231+
Err(err) => {
232+
println!("Error decoding message: {}", err);
233+
return Ok(());
226234
}
227235
};
228-
229-
debug!("Received message: {}", message_str);
230-
231236
// Parse the message
232237
let message = match serde_json::from_str::<Message>(message_str) {
233-
Ok(m) => m,
234-
Err(e) => {
235-
log::error!("Failed to parse message: {}", e);
236-
return Err(RabbitMqError::DeserializationError);
238+
Ok(mess) => {
239+
println!("Parsed message with telegram_id: {}", mess.message_id);
240+
mess
241+
}
242+
Err(err) => {
243+
println!("Error parsing message: {}", err);
244+
return Ok(());
237245
}
238246
};
239247

240-
if ack_on_success {
241-
delivery
242-
.ack(lapin::options::BasicAckOptions::default())
243-
.await
244-
.expect("Failed to acknowledge message");
248+
let message = match handle_message(message) {
249+
Ok(mess) => {
250+
println!("Handled message with telegram_id: {}", mess.message_id);
251+
mess
252+
}
253+
Err(err) => {
254+
println!("Error handling message: {}", err);
255+
return Ok(());
256+
}
257+
};
258+
259+
let message_json = serde_json::to_string(&message).unwrap();
260+
261+
println!("{}", message_json);
262+
263+
{
264+
self.send_message(message_json, "recieve_queue").await;
245265
}
246266

247-
return Ok(message);
267+
// Acknowledge the message
268+
delivery
269+
.ack(BasicAckOptions::default())
270+
.await
271+
.expect("Failed to acknowledge message");
248272
}
249-
Err(RabbitMqError::DeserializationError)
273+
274+
Ok(())
250275
}
251276

252277
// Receive messages from a queue with timeout

0 commit comments

Comments
 (0)