@@ -260,28 +260,59 @@ async def run_route_config(self, route: Dict[str, Any], source: Source, destinat
260260 destination .sent_messages = []
261261
262262 # Process messages from source
263- async for message in source .receive ():
264- if not self ._is_running :
265- break
266-
267- try :
268- # Process message through all processors
269- processed_message = message
270- for processor in processors :
271- try :
272- processed_message = await processor .process (processed_message )
273- if processed_message is None :
274- break # Message was filtered out
275- except Exception as e :
276- self .log (f"Error in processor { type (processor ).__name__ } in route { route_name } : { e } " )
277- processed_message = None
278- break
279-
280- # Send to destination if not filtered
281- if processed_message is not None :
282- try :
283- await destination .send (processed_message )
284- # Append to sent_messages for testing
263+ try :
264+ async for message in source .receive ():
265+ if not self ._is_running :
266+ self .log (f"🛑 Stopping route { route_name } (engine shutdown)" )
267+ break
268+
269+ try :
270+ # Process message through all processors
271+ processed_message = message
272+ for processor in processors :
273+ try :
274+ if hasattr (processor , 'process' ) and callable (processor .process ):
275+ self .log (f"⚙️ Processing message with { type (processor ).__name__ } " )
276+ processed_message = await processor .process (processed_message )
277+ if processed_message is None :
278+ self .log ("ℹ️ Message filtered out by processor" )
279+ break # Message was filtered out
280+ except Exception as e :
281+ self .log (f"❌ Error in processor { type (processor ).__name__ } in route { route_name } : { e } " )
282+ if self .verbose :
283+ import traceback
284+ self .log (traceback .format_exc ())
285+ processed_message = None
286+ break
287+
288+ # Send to destination if not filtered
289+ if processed_message is not None :
290+ try :
291+ self .log (f"📤 Sending message to destination: { type (destination ).__name__ } " )
292+ if hasattr (destination , 'send' ) and callable (destination .send ):
293+ await destination .send (processed_message )
294+ # Store sent messages for testing/verification
295+ if hasattr (destination , 'sent_messages' ) and isinstance (destination .sent_messages , list ):
296+ destination .sent_messages .append (processed_message )
297+ except Exception as e :
298+ self .log (f"❌ Error sending message to destination in route { route_name } : { e } " )
299+ if self .verbose :
300+ import traceback
301+ self .log (traceback .format_exc ())
302+ except Exception as e :
303+ self .log (f"❌ Error processing message in route { route_name } : { e } " )
304+ if self .verbose :
305+ import traceback
306+ self .log (traceback .format_exc ())
307+ except asyncio .CancelledError :
308+ self .log (f"🛑 Route { route_name } was cancelled" )
309+ raise
310+ except Exception as e :
311+ self .log (f"❌ Error in message loop for route { route_name } : { e } " )
312+ if self .verbose :
313+ import traceback
314+ self .log (traceback .format_exc ())
315+ raise
285316 destination .sent_messages .append (processed_message )
286317 except Exception as e :
287318 self .log (f"Error sending message to destination in route { route_name } : { e } " )
0 commit comments