From b8f52aed3c1f0395288f3a0df0c25bef3f59cef1 Mon Sep 17 00:00:00 2001 From: shahaab Date: Tue, 5 Aug 2025 17:16:22 +0530 Subject: [PATCH 1/9] test --- .../single_threaded_multi_queue_listener.py | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py index ce13c63..fe7abcb 100644 --- a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py +++ b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py @@ -38,19 +38,32 @@ def on_channel_open(self, channel: PikaChannel): def on_message(ch: PikaChannel, method: PikaBasic.Deliver, properties: PikaBasicProperties, body: bytes, args): - message = json.loads(body.decode('utf8')) - loop = asyncio.get_event_loop() (handler,) = args - loop.run_until_complete( - handler.async_handle_message( - method=method, properties=properties, message=message - ) - ) - if ch.is_open: - logging.debug('Channel is open, acknowledging the message') - ch.basic_ack(delivery_tag=method.delivery_tag) - else: - logging.warning('Channel closed unable to ack message') + message = json.loads(body.decode('utf8')) + + async def handle_and_ack(): + try: + await handler.async_handle_message( + method=method, properties=properties, message=message + ) + if ch.is_open: + logging.debug('Channel is open, acknowledging the message') + ch.basic_ack(delivery_tag=method.delivery_tag) + else: + logging.warning('Channel closed, unable to ack message') + except Exception as e: + logging.exception('Error while handling message: %s', e) + # Optional: Nack the message if you don't want it retried automatically + if ch.is_open: + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) + raise + + try: + loop = asyncio.get_event_loop() + loop.create_task(handle_and_ack()) + except RuntimeError as e: + logging.exception("No running event loop: %s", e) + raise channel.basic_qos(prefetch_count=settings.PREFETCH_COUNT) for listen_queue_config in self.listen_queue_configs: From a256348367bed2b693f9ce614ad040e9f65162ad Mon Sep 17 00:00:00 2001 From: shahaab Date: Tue, 5 Aug 2025 18:03:42 +0530 Subject: [PATCH 2/9] test --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 2c35ffe..c814082 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ 'pika which is the officially recommended client for Rabbit MQ.', install_requires=[ 'pika==1.3.2', - 'pydantic==2.11.3', - 'starlette==0.46.2' + 'pydantic==2.6.4', + 'starlette==0.36.3' ] ) From 18d06b1fca74250634fa00e777a7d366d2320e6f Mon Sep 17 00:00:00 2001 From: shahaab Date: Tue, 5 Aug 2025 18:59:36 +0530 Subject: [PATCH 3/9] message not getting ack, no logs fix --- .../single_threaded_multi_queue_listener.py | 48 ++++++++++++------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py index fe7abcb..53ab370 100644 --- a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py +++ b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py @@ -2,6 +2,7 @@ import functools import json import logging +import threading from pika import SelectConnection from pika.channel import Channel as PikaChannel @@ -16,64 +17,75 @@ class SingleThreadedMultiQueueListener(object): """ - This is a single threaded multi queue listener, as opposed the other multi queue listener in the library. - this listener does not implement threads, which enables us to trigger async methods while handling the messages + This is a single-threaded, multi-queue RabbitMQ consumer that supports async message handlers. + It avoids threads for message processing, enabling async API calls (like PDF generation). """ def __init__(self, broker_config: BrokerConfig, listen_queue_configs: list[ListenQueueConfig]): logging.debug('Creating connection object and registering callbacks') - self.connection = get_async_connection(broker_config=broker_config, - on_connection_open=self.on_connection_open, - on_connection_closed=self.on_connection_closed) + self.connection = get_async_connection( + broker_config=broker_config, + on_connection_open=self.on_connection_open, + on_connection_closed=self.on_connection_closed + ) self.listen_queue_configs = listen_queue_configs + # Set up a background asyncio event loop for async message handlers + self.loop = asyncio.new_event_loop() + self.loop_thread = threading.Thread(target=self._start_loop, daemon=True) + self.loop_thread.start() + + def _start_loop(self): + asyncio.set_event_loop(self.loop) + self.loop.run_forever() + def on_connection_open(self, connection: SelectConnection): connection.channel(on_open_callback=self.on_channel_open) def on_connection_closed(self, connection, reason): logging.exception('Connection closed: %s', reason) - raise Exception('Connection closed: %s', reason) + raise Exception(f'Connection closed: {reason}') def on_channel_open(self, channel: PikaChannel): channel.add_on_close_callback(self.on_channel_closed) + # Set prefetch to 1 to process one message at a time + channel.basic_qos(prefetch_count=settings.PREFETCH_COUNT) + def on_message(ch: PikaChannel, method: PikaBasic.Deliver, properties: PikaBasicProperties, body: bytes, args): (handler,) = args + logging.debug("Received message: %s", body) message = json.loads(body.decode('utf8')) async def handle_and_ack(): try: + logging.debug("Starting async handler for delivery_tag=%s", method.delivery_tag) await handler.async_handle_message( method=method, properties=properties, message=message ) if ch.is_open: - logging.debug('Channel is open, acknowledging the message') + logging.debug('Acknowledging message with delivery_tag=%s', method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag) else: - logging.warning('Channel closed, unable to ack message') + logging.warning('Channel closed before ack') except Exception as e: logging.exception('Error while handling message: %s', e) - # Optional: Nack the message if you don't want it retried automatically if ch.is_open: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) - raise + raise # crash the consumer - try: - loop = asyncio.get_event_loop() - loop.create_task(handle_and_ack()) - except RuntimeError as e: - logging.exception("No running event loop: %s", e) - raise + # Submit task to background asyncio loop + self.loop.call_soon_threadsafe(lambda: asyncio.ensure_future(handle_and_ack(), loop=self.loop)) - channel.basic_qos(prefetch_count=settings.PREFETCH_COUNT) for listen_queue_config in self.listen_queue_configs: queue_name: str = listen_queue_config.name on_message_callback = functools.partial(on_message, args=(listen_queue_config.handler,)) channel.basic_consume(queue=queue_name, on_message_callback=on_message_callback, auto_ack=False) + logging.info("Consuming from queue: %s", queue_name) def on_channel_closed(self, ch, reason): logging.exception('Channel %i was closed: %s', ch, reason) - raise Exception('Channel %i was closed: %s', ch, reason) + raise Exception(f'Channel {ch} was closed: {reason}') def listen(self): try: From d54bf8e820c2925aaac2d73d01af68c96f266331 Mon Sep 17 00:00:00 2001 From: shahaab Date: Tue, 5 Aug 2025 19:25:40 +0530 Subject: [PATCH 4/9] check the prev --- .../single_threaded_multi_queue_listener.py | 60 ++++++------------- 1 file changed, 18 insertions(+), 42 deletions(-) diff --git a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py index 53ab370..cf9508c 100644 --- a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py +++ b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py @@ -2,7 +2,6 @@ import functools import json import logging -import threading from pika import SelectConnection from pika.channel import Channel as PikaChannel @@ -17,75 +16,52 @@ class SingleThreadedMultiQueueListener(object): """ - This is a single-threaded, multi-queue RabbitMQ consumer that supports async message handlers. - It avoids threads for message processing, enabling async API calls (like PDF generation). + This is a single threaded multi queue listener, as opposed the other multi queue listener in the library. + this listener does not implement threads, which enables us to trigger async methods while handling the messages """ def __init__(self, broker_config: BrokerConfig, listen_queue_configs: list[ListenQueueConfig]): logging.debug('Creating connection object and registering callbacks') - self.connection = get_async_connection( - broker_config=broker_config, - on_connection_open=self.on_connection_open, - on_connection_closed=self.on_connection_closed - ) + self.connection = get_async_connection(broker_config=broker_config, + on_connection_open=self.on_connection_open, + on_connection_closed=self.on_connection_closed) self.listen_queue_configs = listen_queue_configs - # Set up a background asyncio event loop for async message handlers - self.loop = asyncio.new_event_loop() - self.loop_thread = threading.Thread(target=self._start_loop, daemon=True) - self.loop_thread.start() - - def _start_loop(self): - asyncio.set_event_loop(self.loop) - self.loop.run_forever() - def on_connection_open(self, connection: SelectConnection): connection.channel(on_open_callback=self.on_channel_open) def on_connection_closed(self, connection, reason): logging.exception('Connection closed: %s', reason) - raise Exception(f'Connection closed: {reason}') + raise Exception('Connection closed: %s', reason) def on_channel_open(self, channel: PikaChannel): channel.add_on_close_callback(self.on_channel_closed) - # Set prefetch to 1 to process one message at a time - channel.basic_qos(prefetch_count=settings.PREFETCH_COUNT) - def on_message(ch: PikaChannel, method: PikaBasic.Deliver, properties: PikaBasicProperties, body: bytes, args): - (handler,) = args - logging.debug("Received message: %s", body) message = json.loads(body.decode('utf8')) + (handler,) = args async def handle_and_ack(): - try: - logging.debug("Starting async handler for delivery_tag=%s", method.delivery_tag) - await handler.async_handle_message( - method=method, properties=properties, message=message - ) - if ch.is_open: - logging.debug('Acknowledging message with delivery_tag=%s', method.delivery_tag) - ch.basic_ack(delivery_tag=method.delivery_tag) - else: - logging.warning('Channel closed before ack') - except Exception as e: - logging.exception('Error while handling message: %s', e) - if ch.is_open: - ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) - raise # crash the consumer + await handler.async_handle_message( + method=method, properties=properties, message=message + ) + if ch.is_open: + logging.debug('Channel is open, acknowledging the message') + ch.basic_ack(delivery_tag=method.delivery_tag) + else: + logging.warning('Channel closed, unable to ack message') - # Submit task to background asyncio loop - self.loop.call_soon_threadsafe(lambda: asyncio.ensure_future(handle_and_ack(), loop=self.loop)) + asyncio.run(handle_and_ack()) + channel.basic_qos(prefetch_count=settings.PREFETCH_COUNT) for listen_queue_config in self.listen_queue_configs: queue_name: str = listen_queue_config.name on_message_callback = functools.partial(on_message, args=(listen_queue_config.handler,)) channel.basic_consume(queue=queue_name, on_message_callback=on_message_callback, auto_ack=False) - logging.info("Consuming from queue: %s", queue_name) def on_channel_closed(self, ch, reason): logging.exception('Channel %i was closed: %s', ch, reason) - raise Exception(f'Channel {ch} was closed: {reason}') + raise Exception('Channel %i was closed: %s', ch, reason) def listen(self): try: From 24368b96af1b5329a42c4f9edd071db04d87539c Mon Sep 17 00:00:00 2001 From: shahaab Date: Tue, 5 Aug 2025 19:27:50 +0530 Subject: [PATCH 5/9] get_event_loop --- .../single_threaded_multi_queue_listener.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py index cf9508c..489192c 100644 --- a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py +++ b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py @@ -51,7 +51,8 @@ async def handle_and_ack(): else: logging.warning('Channel closed, unable to ack message') - asyncio.run(handle_and_ack()) + loop = asyncio.get_event_loop() + loop.create_task(handle_and_ack()) channel.basic_qos(prefetch_count=settings.PREFETCH_COUNT) for listen_queue_config in self.listen_queue_configs: From 436d9a7ca895846b61678f547e10d752f8deec6a Mon Sep 17 00:00:00 2001 From: shahaab Date: Tue, 5 Aug 2025 20:18:11 +0530 Subject: [PATCH 6/9] test complete change --- .../single_threaded_multi_queue_listener.py | 125 ++++++++++++++++-- 1 file changed, 111 insertions(+), 14 deletions(-) diff --git a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py index 489192c..a83c529 100644 --- a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py +++ b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py @@ -2,6 +2,7 @@ import functools import json import logging +import threading from pika import SelectConnection from pika.channel import Channel as PikaChannel @@ -14,55 +15,71 @@ from rabbitmq_client.single_threaded_consumer.schemas import ListenQueueConfig -class SingleThreadedMultiQueueListener(object): +class SingleThreadedMultiQueueListener: """ - This is a single threaded multi queue listener, as opposed the other multi queue listener in the library. - this listener does not implement threads, which enables us to trigger async methods while handling the messages + This is a single-threaded, multi-queue RabbitMQ consumer that supports async message handlers. + It avoids threads for message processing, enabling async API calls (like PDF generation). """ def __init__(self, broker_config: BrokerConfig, listen_queue_configs: list[ListenQueueConfig]): logging.debug('Creating connection object and registering callbacks') - self.connection = get_async_connection(broker_config=broker_config, - on_connection_open=self.on_connection_open, - on_connection_closed=self.on_connection_closed) + self.connection = get_async_connection( + broker_config=broker_config, + on_connection_open=self.on_connection_open, + on_connection_closed=self.on_connection_closed + ) self.listen_queue_configs = listen_queue_configs + # Set up a background asyncio event loop for async message handlers + self.loop = asyncio.new_event_loop() + self.loop_thread = threading.Thread(target=self._start_loop, daemon=True) + self.loop_thread.start() + + def _start_loop(self): + asyncio.set_event_loop(self.loop) + self.loop.run_forever() + def on_connection_open(self, connection: SelectConnection): connection.channel(on_open_callback=self.on_channel_open) def on_connection_closed(self, connection, reason): logging.exception('Connection closed: %s', reason) - raise Exception('Connection closed: %s', reason) + raise Exception(f'Connection closed: {reason}') def on_channel_open(self, channel: PikaChannel): channel.add_on_close_callback(self.on_channel_closed) + # Set prefetch to 1 to process one message at a time + channel.basic_qos(prefetch_count=settings.PREFETCH_COUNT) + def on_message(ch: PikaChannel, method: PikaBasic.Deliver, properties: PikaBasicProperties, body: bytes, args): - message = json.loads(body.decode('utf8')) (handler,) = args + logging.debug("Received message: %s", body) + message = json.loads(body.decode('utf8')) async def handle_and_ack(): + logging.debug("Starting async handler for delivery_tag=%s", method.delivery_tag) await handler.async_handle_message( method=method, properties=properties, message=message ) if ch.is_open: - logging.debug('Channel is open, acknowledging the message') + logging.debug('Acknowledging message with delivery_tag=%s', method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag) else: - logging.warning('Channel closed, unable to ack message') + logging.warning('Channel closed before ack') - loop = asyncio.get_event_loop() - loop.create_task(handle_and_ack()) + # Submit task to background asyncio loop + self.loop.call_soon_threadsafe(lambda: asyncio.ensure_future(handle_and_ack(), loop=self.loop)) - channel.basic_qos(prefetch_count=settings.PREFETCH_COUNT) for listen_queue_config in self.listen_queue_configs: queue_name: str = listen_queue_config.name on_message_callback = functools.partial(on_message, args=(listen_queue_config.handler,)) channel.basic_consume(queue=queue_name, on_message_callback=on_message_callback, auto_ack=False) + logging.info("Consuming from queue: %s", queue_name) def on_channel_closed(self, ch, reason): logging.exception('Channel %i was closed: %s', ch, reason) - raise Exception('Channel %i was closed: %s', ch, reason) + raise Exception(f'Channel {ch} was closed: {reason}') def listen(self): try: @@ -76,3 +93,83 @@ def listen(self): set_consumer_status(is_healthy=False) self.connection.close() raise ex + + +# import asyncio +# import functools +# import json +# import logging +# +# from pika import SelectConnection +# from pika.channel import Channel as PikaChannel +# from pika.spec import Basic as PikaBasic, BasicProperties as PikaBasicProperties +# +# from rabbitmq_client.async_connection.connection import get_async_connection +# from rabbitmq_client.broker_config import BrokerConfig +# from rabbitmq_client.config import settings +# from rabbitmq_client.consumer.health_utils import set_consumer_status +# from rabbitmq_client.single_threaded_consumer.schemas import ListenQueueConfig +# +# +# class SingleThreadedMultiQueueListener(object): +# """ +# This is a single threaded multi queue listener, as opposed the other multi queue listener in the library. +# this listener does not implement threads, which enables us to trigger async methods while handling the messages +# """ +# def __init__(self, broker_config: BrokerConfig, listen_queue_configs: list[ListenQueueConfig]): +# logging.debug('Creating connection object and registering callbacks') +# self.connection = get_async_connection(broker_config=broker_config, +# on_connection_open=self.on_connection_open, +# on_connection_closed=self.on_connection_closed) +# self.listen_queue_configs = listen_queue_configs +# +# def on_connection_open(self, connection: SelectConnection): +# connection.channel(on_open_callback=self.on_channel_open) +# +# def on_connection_closed(self, connection, reason): +# logging.exception('Connection closed: %s', reason) +# raise Exception('Connection closed: %s', reason) +# +# def on_channel_open(self, channel: PikaChannel): +# channel.add_on_close_callback(self.on_channel_closed) +# +# def on_message(ch: PikaChannel, method: PikaBasic.Deliver, +# properties: PikaBasicProperties, body: bytes, args): +# message = json.loads(body.decode('utf8')) +# (handler,) = args +# +# async def handle_and_ack(): +# await handler.async_handle_message( +# method=method, properties=properties, message=message +# ) +# if ch.is_open: +# logging.debug('Channel is open, acknowledging the message') +# ch.basic_ack(delivery_tag=method.delivery_tag) +# else: +# logging.warning('Channel closed, unable to ack message') +# +# loop = asyncio.get_event_loop() +# loop.create_task(handle_and_ack()) +# +# channel.basic_qos(prefetch_count=settings.PREFETCH_COUNT) +# for listen_queue_config in self.listen_queue_configs: +# queue_name: str = listen_queue_config.name +# on_message_callback = functools.partial(on_message, args=(listen_queue_config.handler,)) +# channel.basic_consume(queue=queue_name, on_message_callback=on_message_callback, auto_ack=False) +# +# def on_channel_closed(self, ch, reason): +# logging.exception('Channel %i was closed: %s', ch, reason) +# raise Exception('Channel %i was closed: %s', ch, reason) +# +# def listen(self): +# try: +# logging.info('Starting IO Loop to listen for messages') +# set_consumer_status(is_healthy=True) +# self.connection.ioloop.start() +# except KeyboardInterrupt: +# logging.info('Keyboard Interrupt Closing') +# self.connection.close() +# except Exception as ex: +# set_consumer_status(is_healthy=False) +# self.connection.close() +# raise ex From ff4c64b938046773dfa5aa15615d0b0537300c81 Mon Sep 17 00:00:00 2001 From: shahaab Date: Wed, 6 Aug 2025 13:42:07 +0530 Subject: [PATCH 7/9] r_test_2.46.0 --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index c814082..2c35ffe 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ 'pika which is the officially recommended client for Rabbit MQ.', install_requires=[ 'pika==1.3.2', - 'pydantic==2.6.4', - 'starlette==0.36.3' + 'pydantic==2.11.3', + 'starlette==0.46.2' ] ) From adda96686202f171c8ae623686e1aa8ba8bc71c5 Mon Sep 17 00:00:00 2001 From: shahaab Date: Wed, 6 Aug 2025 15:02:10 +0530 Subject: [PATCH 8/9] r_test_2.46.0: removed old commented code --- .../single_threaded_multi_queue_listener.py | 80 ------------------- 1 file changed, 80 deletions(-) diff --git a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py index a83c529..00efc34 100644 --- a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py +++ b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py @@ -93,83 +93,3 @@ def listen(self): set_consumer_status(is_healthy=False) self.connection.close() raise ex - - -# import asyncio -# import functools -# import json -# import logging -# -# from pika import SelectConnection -# from pika.channel import Channel as PikaChannel -# from pika.spec import Basic as PikaBasic, BasicProperties as PikaBasicProperties -# -# from rabbitmq_client.async_connection.connection import get_async_connection -# from rabbitmq_client.broker_config import BrokerConfig -# from rabbitmq_client.config import settings -# from rabbitmq_client.consumer.health_utils import set_consumer_status -# from rabbitmq_client.single_threaded_consumer.schemas import ListenQueueConfig -# -# -# class SingleThreadedMultiQueueListener(object): -# """ -# This is a single threaded multi queue listener, as opposed the other multi queue listener in the library. -# this listener does not implement threads, which enables us to trigger async methods while handling the messages -# """ -# def __init__(self, broker_config: BrokerConfig, listen_queue_configs: list[ListenQueueConfig]): -# logging.debug('Creating connection object and registering callbacks') -# self.connection = get_async_connection(broker_config=broker_config, -# on_connection_open=self.on_connection_open, -# on_connection_closed=self.on_connection_closed) -# self.listen_queue_configs = listen_queue_configs -# -# def on_connection_open(self, connection: SelectConnection): -# connection.channel(on_open_callback=self.on_channel_open) -# -# def on_connection_closed(self, connection, reason): -# logging.exception('Connection closed: %s', reason) -# raise Exception('Connection closed: %s', reason) -# -# def on_channel_open(self, channel: PikaChannel): -# channel.add_on_close_callback(self.on_channel_closed) -# -# def on_message(ch: PikaChannel, method: PikaBasic.Deliver, -# properties: PikaBasicProperties, body: bytes, args): -# message = json.loads(body.decode('utf8')) -# (handler,) = args -# -# async def handle_and_ack(): -# await handler.async_handle_message( -# method=method, properties=properties, message=message -# ) -# if ch.is_open: -# logging.debug('Channel is open, acknowledging the message') -# ch.basic_ack(delivery_tag=method.delivery_tag) -# else: -# logging.warning('Channel closed, unable to ack message') -# -# loop = asyncio.get_event_loop() -# loop.create_task(handle_and_ack()) -# -# channel.basic_qos(prefetch_count=settings.PREFETCH_COUNT) -# for listen_queue_config in self.listen_queue_configs: -# queue_name: str = listen_queue_config.name -# on_message_callback = functools.partial(on_message, args=(listen_queue_config.handler,)) -# channel.basic_consume(queue=queue_name, on_message_callback=on_message_callback, auto_ack=False) -# -# def on_channel_closed(self, ch, reason): -# logging.exception('Channel %i was closed: %s', ch, reason) -# raise Exception('Channel %i was closed: %s', ch, reason) -# -# def listen(self): -# try: -# logging.info('Starting IO Loop to listen for messages') -# set_consumer_status(is_healthy=True) -# self.connection.ioloop.start() -# except KeyboardInterrupt: -# logging.info('Keyboard Interrupt Closing') -# self.connection.close() -# except Exception as ex: -# set_consumer_status(is_healthy=False) -# self.connection.close() -# raise ex From 58aef7e8bcaf410a77b9cffbd523b301fcbcadcd Mon Sep 17 00:00:00 2001 From: shahaab Date: Tue, 12 Aug 2025 12:49:31 +0530 Subject: [PATCH 9/9] r_test_2.46.0: moved asyncio event loop to listen --- .../single_threaded_multi_queue_listener.py | 38 +++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py index 00efc34..d093120 100644 --- a/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py +++ b/rabbitmq_client/single_threaded_consumer/single_threaded_multi_queue_listener.py @@ -29,11 +29,6 @@ def __init__(self, broker_config: BrokerConfig, listen_queue_configs: list[Liste ) self.listen_queue_configs = listen_queue_configs - # Set up a background asyncio event loop for async message handlers - self.loop = asyncio.new_event_loop() - self.loop_thread = threading.Thread(target=self._start_loop, daemon=True) - self.loop_thread.start() - def _start_loop(self): asyncio.set_event_loop(self.loop) self.loop.run_forever() @@ -83,13 +78,42 @@ def on_channel_closed(self, ch, reason): def listen(self): try: + logging.info('Starting background asyncio event loop for async message handlers') + # Set up a + self.loop = asyncio.new_event_loop() + self.loop_thread = threading.Thread(target=self._start_loop, daemon=True) + self.loop_thread.start() + logging.info('Starting IO Loop to listen for messages') set_consumer_status(is_healthy=True) self.connection.ioloop.start() except KeyboardInterrupt: logging.info('Keyboard Interrupt Closing') - self.connection.close() + self.stop() except Exception as ex: set_consumer_status(is_healthy=False) - self.connection.close() + self.stop() raise ex + + def stop(self): + logging.info("Stopping consumer...") + + # Stop pika I/O loop if running + try: + if self.connection and not self.connection.is_closed: + self.connection.close() + if self.connection and self.connection.ioloop.is_running: + self.connection.ioloop.stop() + except Exception as e: + logging.warning(f"Error stopping pika loop: {e}") + + # Stop asyncio loop + if self.loop and self.loop.is_running(): + self.loop.call_soon_threadsafe(self.loop.stop) + + # Wait for background loop thread to finish + if hasattr(self, "loop_thread") and self.loop_thread.is_alive(): + self.loop_thread.join(timeout=5) + + logging.info("Consumer stopped.") +