diff --git a/agents/matlab/matlab_agent/docs/interactive.md b/agents/matlab/matlab_agent/docs/interactive.md index c1bea9f0..a3b2f9ef 100644 --- a/agents/matlab/matlab_agent/docs/interactive.md +++ b/agents/matlab/matlab_agent/docs/interactive.md @@ -1,6 +1,10 @@ -# Interactive Frame Format +# Interactive Simulation -Frames exchanged during an interactive simulation are YAML documents with the following structure: +An interactive simulation exchanges small data frames with the MATLAB process while it is running. The agent keeps a persistent TCP connection to MATLAB and relays frames received from RabbitMQ to the process. Every response from MATLAB is sent back to the broker so that clients can consume it in real time. + +## Frame Format + +Each message flowing from the broker to the agent must be a YAML document of the form: ```yaml simulation: @@ -8,4 +12,14 @@ simulation: : ``` -The `inputs` section contains key/value pairs that are provided to the MATLAB simulation. Each frame sent from the message broker to the agent should conform to this format. +The `inputs` section contains the parameters that will be provided to the MATLAB simulation step. Frames published by MATLAB follow the same structure but typically include additional fields describing the current state or a completion flag. + +## Workflow + +1. **Start** – When the agent receives an `interactive` simulation request it launches an `InteractiveSimulator` in a dedicated thread. The simulator starts two local TCP servers: one for sending data to MATLAB and one for receiving its outputs. +2. **Connection Setup** – The simulator opens its own `BlockingConnection` to RabbitMQ. A queue named `Q..interactive.` is declared and bound to the routing key defined in the simulation payload (`inputs.stream_source`). +3. **Handshake** – After starting MATLAB, the agent waits for the MATLAB process to connect to both TCP ports. A simple handshake is performed by sending an empty JSON object. +4. **Main Loop** – The simulator continuously polls the dedicated RabbitMQ connection for new frames. Received inputs are forwarded over the TCP socket to MATLAB. Any JSON messages coming from MATLAB are packaged into responses using `create_response` and published with `send_result`. +5. **Completion** – When MATLAB signals that the simulation is finished (for example by sending a frame with `status: completed`) the simulator publishes the final response, closes the TCP servers and its RabbitMQ connection, and exits. + +This separation of connections ensures that no socket is shared between threads and allows the agent to stop the interactive simulation safely if requested. diff --git a/agents/matlab/matlab_agent/docs/threading.md b/agents/matlab/matlab_agent/docs/threading.md new file mode 100644 index 00000000..ea4f2e03 --- /dev/null +++ b/agents/matlab/matlab_agent/docs/threading.md @@ -0,0 +1,22 @@ +# Thread Management in the MATLAB Agent + +The agent separates RabbitMQ communication from MATLAB execution using a simple thread model. All RabbitMQ I/O occurs on a single consumer thread while each simulation runs in its own worker thread. This prevents thread‑safety problems in `pika` and allows the agent to interrupt simulations without blocking message handling. + +## Consumer (I/O) Thread + +`RabbitMQManager.start_consuming()` is called from the main thread of the agent. The method stores the thread that invokes it as the **I/O thread** and starts `pika`'s blocking consumption loop. Because `pika.BlockingConnection` is not thread safe, every publish must happen on this same thread. + +`RabbitMQManager.send_message()` checks whether the current thread matches the stored I/O thread. If a worker thread needs to publish a message, the method schedules the publish with `connection.add_callback_threadsafe()`. The callback is then executed on the consumer thread so all RabbitMQ operations are serialized on the connection. + +## Simulation Threads + +`MessageHandler` creates a concrete implementation of the `MatlabSimulator` interface whenever a new message arrives. The available implementations are `BatchSimulator`, `StreamingSimulator` and `InteractiveSimulator`. Calling `start()` on any simulator launches a new daemon thread and invokes its private `_execute()` method. + +The simulator thread performs the MATLAB computation and uses `RabbitMQManager.send_result()` to report progress or final data. Because `send_result()` relies on `send_message()` the actual publish always occurs on the I/O thread. + +Interactive simulations also need to read frames from RabbitMQ while running. To avoid sharing the consumer connection across threads the interactive controller opens a dedicated `BlockingConnection` inside its thread. Results are still published through the manager so they follow the same safe path. + +## Active Simulator and Stopping + +`MessageHandler` stores the currently running simulator instance in `active_simulator`. Only one simulator can run at a time. When a `STOP` command is received the handler calls `active_simulator.stop()` which clears an internal event. Each `_execute()` loop checks this flag and exits when it is cleared. This mechanism provides a cooperative way to terminate simulations without abruptly killing threads. + diff --git a/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py b/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py index cdebb776..efdb4613 100644 --- a/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py +++ b/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py @@ -15,9 +15,15 @@ from .interfaces import IRabbitMQMessageHandler from ...utils.logger import get_logger from ...utils.create_response import create_response -from ...core.batch import handle_batch_simulation -from ...core.streaming import handle_streaming_simulation -from ...core.interactive import handle_interactive_simulation +from ...core.simulator import ( + MatlabSimulator, + BatchSimulator, + StreamingSimulator, + InteractiveSimulator, +) +from ...core.batch import handle_batch_simulation # Backwards compatibility +from ...core.streaming import handle_streaming_simulation # Backwards compat +from ...core.interactive import handle_interactive_simulation # Backwards compat from ...utils.commands import CommandRegistry logger = get_logger() @@ -99,6 +105,7 @@ def __init__(self, agent_id: str, rabbitmq_manager: Any, self.response_templates = self.config.get( 'response_templates', {}) self.interactive_queues: Dict[str, queue.Queue] = {} + self.active_simulator: Optional[MatlabSimulator] = None def get_agent_id(self) -> str: """ @@ -145,7 +152,11 @@ def handle_message( cmd = msg_dict['command'].upper() if cmd == 'STOP': # Signal to stop the current simulation - logger.info("Received STOP command, signaling to stop simulation") + logger.info( + "Received STOP command, signaling to stop simulation" + ) + if self.active_simulator and self.active_simulator.is_running(): + self.active_simulator.stop() CommandRegistry.stop() elif cmd == 'RUN': # Reset the stop event to allow running simulations @@ -238,34 +249,39 @@ def handle_message( logger.info("Received simulation type: %s", sim_type) # Process based on simulation type if sim_type == 'batch': - handle_batch_simulation( + self.active_simulator = BatchSimulator( msg_dict, source, self.rabbitmq_manager, self.path_simulation, - self.response_templates) + self.response_templates, + ) + self.active_simulator.start() ch.basic_ack(delivery_tag=method.delivery_tag) elif sim_type == 'streaming': ch.basic_ack(delivery_tag=method.delivery_tag) - tcp_settings = self.config.get( - 'tcp', {}) - handle_streaming_simulation( - msg_dict, source, + tcp_settings = self.config.get('tcp', {}) + self.active_simulator = StreamingSimulator( + msg_dict, + source, self.rabbitmq_manager, self.path_simulation, self.response_templates, - tcp_settings + tcp_settings, ) + self.active_simulator.start() elif sim_type == 'interactive': ch.basic_ack(delivery_tag=method.delivery_tag) tcp_settings = self.config.get('tcp', {}) - handle_interactive_simulation( - msg_dict, source, + self.active_simulator = InteractiveSimulator( + msg_dict, + source, self.rabbitmq_manager, self.path_simulation, self.response_templates, - tcp_settings + tcp_settings, ) + self.active_simulator.start() else: logger.error("Unknown simulation type: %s", sim_type) error_response = create_response( diff --git a/agents/matlab/matlab_agent/src/comm/rabbitmq/rabbitmq_manager.py b/agents/matlab/matlab_agent/src/comm/rabbitmq/rabbitmq_manager.py index 23bb5fa3..4d94b726 100644 --- a/agents/matlab/matlab_agent/src/comm/rabbitmq/rabbitmq_manager.py +++ b/agents/matlab/matlab_agent/src/comm/rabbitmq/rabbitmq_manager.py @@ -7,6 +7,7 @@ import ssl import uuid from typing import Dict, Any, Callable, Optional +import threading import yaml import pika @@ -37,6 +38,7 @@ def __init__(self, agent_id: str, config: Dict[str, Any]) -> None: self.config: Dict[str, Any] = config self.connection: Optional[pika.BlockingConnection] = None self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None + self._io_thread: Optional[threading.Thread] = None self.input_queue_name: str = f'Q.sim.{self.agent_id}' self.message_handler: Optional[Callable[[ pika.adapters.blocking_connection.BlockingChannel, @@ -207,6 +209,7 @@ def start_consuming(self) -> None: return try: + self._io_thread = threading.current_thread() self.channel.basic_consume( queue=self.input_queue_name, on_message_callback=self.message_handler @@ -243,26 +246,34 @@ def send_message( Returns: bool: True if successful, False otherwise """ - try: - self.channel.basic_publish( - exchange=exchange, - routing_key=routing_key, - body=body, - properties=properties or pika.BasicProperties( - delivery_mode=2 # Persistent message + def _publish() -> bool: + try: + self.channel.basic_publish( + exchange=exchange, + routing_key=routing_key, + body=body, + properties=properties or pika.BasicProperties( + delivery_mode=2 # Persistent message + ) ) - ) - logger.debug( - "Sent message to exchange %s with routing key %s", - exchange, - routing_key) + logger.debug( + "Sent message to exchange %s with routing key %s", + exchange, + routing_key) + return True + except pika.exceptions.AMQPError as e: + logger.error("Failed to send message: %s", e) + return False + except Exception as e: # pragma: no cover - unexpected errors + logger.error("Unexpected error: %s", e) + return False + + if self._io_thread and threading.current_thread() is not self._io_thread: + assert self.connection is not None + self.connection.add_callback_threadsafe(_publish) return True - except pika.exceptions.AMQPError as e: - logger.error("Failed to send message: %s", e) - return False - except Exception as e: - logger.error("Unexpected error: %s", e) - return False + + return _publish() def send_result(self, destination: str, result: Dict[str, Any]) -> bool: """ diff --git a/agents/matlab/matlab_agent/src/core/interactive.py b/agents/matlab/matlab_agent/src/core/interactive.py index 954b3f87..4968ff02 100644 --- a/agents/matlab/matlab_agent/src/core/interactive.py +++ b/agents/matlab/matlab_agent/src/core/interactive.py @@ -9,6 +9,9 @@ from pathlib import Path from select import select from typing import Any, Dict, Optional +import ssl + +import pika import psutil import yaml @@ -24,7 +27,7 @@ DEFAULT_INPUT_PORT, DEFAULT_OUTPUT_PORT, MAX_FILENAME_LENGTH, - EXCHANGE_INPUT_STREAM + EXCHANGE_INPUT_STREAM, ) logger = get_logger() @@ -62,8 +65,7 @@ def accept(self) -> None: self._conn, _ = self._srv.accept() self._conn.setblocking(False) else: - logger.error( - "[INTERACTIVE] Timeout waiting for client connection.") + logger.error("[INTERACTIVE] Timeout waiting for client connection.") raise TimeoutError("No client connection received in time.") def send(self, data: Dict[str, Any]) -> None: @@ -95,7 +97,9 @@ def recv_all(self) -> list[Dict[str, Any]]: continue try: messages.append(json.loads(line.decode())) - except json.JSONDecodeError as exc: # pragma: no cover - logs error and skips invalid message + except ( + json.JSONDecodeError + ) as exc: # pragma: no cover - logs error and skips invalid message logger.error("[INTERACTIVE] Invalid JSON: %s", exc) messages.append({"error": f"Invalid JSON: {str(exc)}"}) # Return the messages list, i.e., all complete decoded JSON objects (plus any error placeholders) available at that moment. @@ -179,9 +183,13 @@ def start(self, pm: PerformanceMonitor) -> None: self.start_time = time.time() self.out_srv.start() self.in_srv.start() - logger.debug("[INTERACTIVE] TCP servers started on %s:%s and %s:%s", - self.out_srv.host, self.out_srv.port, - self.in_srv.host, self.in_srv.port) + logger.debug( + "[INTERACTIVE] TCP servers started on %s:%s and %s:%s", + self.out_srv.host, + self.out_srv.port, + self.in_srv.host, + self.in_srv.port, + ) self._start_matlab() logger.debug("[INTERACTIVE] Waiting for MATLAB to start...") pm.record_matlab_startup_complete() @@ -215,9 +223,7 @@ def _only_inputs(frame: Dict[str, Any]) -> Dict[str, Any]: if isinstance(frame, dict): sim = frame.get("simulation") if isinstance(sim, dict) and "inputs" in sim: - logger.debug( - "[INTERACTIVE] Received inputs: %s", - sim["inputs"]) + logger.debug("[INTERACTIVE] Received inputs: %s", sim["inputs"]) return sim["inputs"] return frame @@ -226,32 +232,67 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None: sim = msg_dict["simulation"] stream_key = sim["inputs"]["stream_source"].replace("rabbitmq://", "") - ch = self.broker.channel + # Interactive simulations run in their own thread. Create a dedicated + # connection so all RabbitMQ operations occur on this thread and do + # not interfere with the consumer's connection. + rabbitmq_cfg = self.broker.config.get("rabbitmq", {}) + credentials = pika.PlainCredentials( + rabbitmq_cfg.get("username", "guest"), + rabbitmq_cfg.get("password", "guest"), + ) + vhost = rabbitmq_cfg.get("vhost", "/") + if rabbitmq_cfg.get("tls", False): + context = ssl.create_default_context() + ssl_options = pika.SSLOptions( + context, + rabbitmq_cfg.get("host", "localhost"), + ) + parameters = pika.ConnectionParameters( + host=rabbitmq_cfg.get("host", "localhost"), + port=rabbitmq_cfg.get("port", 5671), + virtual_host=vhost, + credentials=credentials, + ssl_options=ssl_options, + heartbeat=rabbitmq_cfg.get("heartbeat", 600), + ) + else: + parameters = pika.ConnectionParameters( + host=rabbitmq_cfg.get("host", "localhost"), + port=rabbitmq_cfg.get("port", 5672), + virtual_host=vhost, + credentials=credentials, + heartbeat=rabbitmq_cfg.get("heartbeat", 600), + ) + connection = pika.BlockingConnection(parameters) + ch = connection.channel() qname = f"Q.{self.agent_id}.interactive.{self.request_id}" ch.exchange_declare( exchange=EXCHANGE_INPUT_STREAM, exchange_type="topic", - durable=True) + durable=True, + ) ch.queue_declare(queue=qname, durable=True) ch.queue_bind( exchange=EXCHANGE_INPUT_STREAM, queue=qname, - routing_key=stream_key) + routing_key=stream_key, + ) try: while True: - if self.out_srv.matlab_proc and self.out_srv.matlab_proc.poll() is not None: + if ( + self.out_srv.matlab_proc + and self.out_srv.matlab_proc.poll() is not None + ): logger.debug("[INTERACTIVE] MATLAB process ended, stopping loop") break - method, _ , body = ch.basic_get( - queue=qname, auto_ack=True) + method, _, body = ch.basic_get(queue=qname, auto_ack=True) while method: frame = _parse_frame(body) if frame: # Send the inputs to MATLAB self.in_srv.send(self._only_inputs(frame)) - method, _ , body = ch.basic_get( - queue=qname, auto_ack=True) + method, _, body = ch.basic_get(queue=qname, auto_ack=True) # Receive Responses from MATLAB for resp in self.out_srv.recv_all(): @@ -265,6 +306,10 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None: logger.info("[INTERACTIVE] Interrupted by user") finally: pm.record_simulation_complete() + try: + connection.close() + except Exception: # pragma: no cover - cleanup errors + pass def close(self) -> None: """Close the TCP servers""" @@ -275,8 +320,7 @@ def metadata(self) -> Dict[str, Any]: meta: Dict[str, Any] = {} if self.start_time: meta["execution_time"] = time.time() - self.start_time - meta["memory_usage"] = psutil.Process( - ).memory_info().rss // BYTES_IN_MB + meta["memory_usage"] = psutil.Process().memory_info().rss // BYTES_IN_MB return meta @@ -291,9 +335,7 @@ def handle_interactive_simulation( pm = PerformanceMonitor() sim = msg_dict["simulation"] pm.start_operation(sim["request_id"]) - logger.debug( - "[INTERACTIVE] Starting interactive simulation: %s", - sim["file"]) + logger.debug("[INTERACTIVE] Starting interactive simulation: %s", sim["file"]) controller = MatlabInteractiveController( path_simulation or sim.get("path"), sim["file"], diff --git a/agents/matlab/matlab_agent/src/core/simulator.py b/agents/matlab/matlab_agent/src/core/simulator.py new file mode 100644 index 00000000..28fef948 --- /dev/null +++ b/agents/matlab/matlab_agent/src/core/simulator.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +"""Unified MATLAB simulation interface and implementations.""" + +from abc import ABC, abstractmethod +import threading +from typing import Any, Dict, Optional + +from ..comm.interfaces import IMessageBroker +from .batch import handle_batch_simulation +from .streaming import handle_streaming_simulation +from .interactive import handle_interactive_simulation + + +class MatlabSimulator(ABC): + """Abstract base class for MATLAB simulations.""" + + def __init__(self) -> None: + self._thread: Optional[threading.Thread] = None + self._running = threading.Event() + + def start(self) -> None: + """Start the simulation in a separate thread.""" + if self._running.is_set(): + return + self._running.set() + self._thread = threading.Thread(target=self._execute, daemon=True) + self._thread.start() + + def stop(self) -> None: + """Signal the simulation to stop.""" + self._running.clear() + + def is_running(self) -> bool: + """Check if the simulation thread is active.""" + return self._running.is_set() + + @abstractmethod + def _execute(self) -> None: + """Execute the simulation logic.""" + + +class BatchSimulator(MatlabSimulator): + """Run MATLAB batch simulations.""" + + def __init__( + self, + msg_dict: Dict[str, Any], + source: str, + broker: IMessageBroker, + path_simulation: str, + response_templates: Dict[str, Any], + ) -> None: + super().__init__() + self.msg_dict = msg_dict + self.source = source + self.broker = broker + self.path = path_simulation + self.templates = response_templates + + def _execute(self) -> None: # pragma: no cover - thin wrapper + handle_batch_simulation( + self.msg_dict, self.source, self.broker, self.path, self.templates + ) + self.stop() + + +class StreamingSimulator(MatlabSimulator): + """Run MATLAB streaming simulations.""" + + def __init__( + self, + msg_dict: Dict[str, Any], + source: str, + broker: IMessageBroker, + path_simulation: str, + response_templates: Dict[str, Any], + tcp_settings: Dict[str, Any], + ) -> None: + super().__init__() + self.msg_dict = msg_dict + self.source = source + self.broker = broker + self.path = path_simulation + self.templates = response_templates + self.tcp = tcp_settings + + def _execute(self) -> None: # pragma: no cover - thin wrapper + handle_streaming_simulation( + self.msg_dict, + self.source, + self.broker, + self.path, + self.templates, + self.tcp, + ) + self.stop() + + +class InteractiveSimulator(MatlabSimulator): + """Run MATLAB interactive simulations.""" + + def __init__( + self, + msg_dict: Dict[str, Any], + source: str, + broker: IMessageBroker, + path_simulation: str, + response_templates: Dict[str, Any], + tcp_settings: Dict[str, Any], + ) -> None: + super().__init__() + self.msg_dict = msg_dict + self.source = source + self.broker = broker + self.path = path_simulation + self.templates = response_templates + self.tcp = tcp_settings + + def _execute(self) -> None: # pragma: no cover - thin wrapper + handle_interactive_simulation( + self.msg_dict, + self.source, + self.broker, + self.path, + self.templates, + self.tcp, + ) + self.stop() diff --git a/agents/matlab/matlab_agent/test/unit/test_message_handler.py b/agents/matlab/matlab_agent/test/unit/test_message_handler.py index f3893e7b..347a1968 100644 --- a/agents/matlab/matlab_agent/test/unit/test_message_handler.py +++ b/agents/matlab/matlab_agent/test/unit/test_message_handler.py @@ -145,10 +145,10 @@ def test_get_agent_id_returns_correct_id(self): assert self.handler.get_agent_id() == self.agent_id @patch('src.comm.rabbitmq.message_handler.yaml.safe_load') - @patch('src.comm.rabbitmq.message_handler.handle_batch_simulation') + @patch('src.comm.rabbitmq.message_handler.BatchSimulator') @patch('src.comm.rabbitmq.message_handler.create_response') def test_handle_message_batch_simulation_success( - self, mock_create_response, mock_handle_batch, mock_yaml_load + self, mock_create_response, mock_batch_cls, mock_yaml_load ): """Test successful handling of batch simulation message.""" # Setup valid message data @@ -175,22 +175,23 @@ def test_handle_message_batch_simulation_success( # Verify mock_yaml_load.assert_called_once_with(b'test message body') - mock_handle_batch.assert_called_once_with( + mock_batch_cls.assert_called_once_with( message_data, 'source', self.mock_rabbitmq_manager, '/test/path', {'error': 'error_template'} ) + mock_batch_cls.return_value.start.assert_called_once() self.mock_channel.basic_ack.assert_called_once_with( delivery_tag="test_tag" ) @patch('src.comm.rabbitmq.message_handler.yaml.safe_load') - @patch('src.comm.rabbitmq.message_handler.handle_streaming_simulation') + @patch('src.comm.rabbitmq.message_handler.StreamingSimulator') @patch('src.comm.rabbitmq.message_handler.create_response') def test_handle_message_streaming_simulation_success( - self, mock_create_response, mock_handle_streaming, mock_yaml_load + self, mock_create_response, mock_stream_cls, mock_yaml_load ): """Test successful handling of streaming simulation message.""" # Setup valid message data @@ -217,7 +218,7 @@ def test_handle_message_streaming_simulation_success( # Verify mock_yaml_load.assert_called_once_with(b'test message body') - mock_handle_streaming.assert_called_once_with( + mock_stream_cls.assert_called_once_with( message_data, 'source', self.mock_rabbitmq_manager, @@ -225,6 +226,7 @@ def test_handle_message_streaming_simulation_success( {'error': 'error_template'}, {'port': 8080} ) + mock_stream_cls.return_value.start.assert_called_once() self.mock_channel.basic_ack.assert_called_once_with( delivery_tag="test_tag" ) @@ -495,7 +497,7 @@ def test_routing_key_extraction(self, mock_yaml_load): self.mock_method.routing_key = routing_key expected_source = routing_key.split('.')[0] - with patch('src.comm.rabbitmq.message_handler.handle_batch_simulation') as mock_batch: + with patch('src.comm.rabbitmq.message_handler.BatchSimulator') as mock_batch_cls: self.handler.handle_message( self.mock_channel, self.mock_method, @@ -504,11 +506,11 @@ def test_routing_key_extraction(self, mock_yaml_load): ) # Verify source is correctly extracted - mock_batch.assert_called_once() - call_args = mock_batch.call_args[0] + mock_batch_cls.assert_called_once() + call_args = mock_batch_cls.call_args[0] assert call_args[1] == expected_source # source parameter - mock_batch.reset_mock() + mock_batch_cls.reset_mock() # Integration test class for end-to-end testing @@ -530,8 +532,8 @@ def setup_method(self): self.config ) - @patch('src.comm.rabbitmq.message_handler.handle_batch_simulation') - def test_complete_batch_message_flow(self, mock_handle_batch): + @patch('src.comm.rabbitmq.message_handler.BatchSimulator') + def test_complete_batch_message_flow(self, mock_batch_cls): """Test complete flow of a valid batch message.""" # Create complete valid message message_dict = { @@ -578,8 +580,8 @@ def test_complete_batch_message_flow(self, mock_handle_batch): ) # Verify successful processing - mock_handle_batch.assert_called_once() - call_args = mock_handle_batch.call_args[0] + mock_batch_cls.assert_called_once() + call_args = mock_batch_cls.call_args[0] # Verify all parameters passed correctly assert call_args[0] == message_dict # message data @@ -592,9 +594,10 @@ def test_complete_batch_message_flow(self, mock_handle_batch): mock_channel.basic_ack.assert_called_once_with( delivery_tag="integration_tag" ) + mock_batch_cls.return_value.start.assert_called_once() - @patch('src.comm.rabbitmq.message_handler.handle_streaming_simulation') - def test_complete_streaming_message_flow(self, mock_handle_streaming): + @patch('src.comm.rabbitmq.message_handler.StreamingSimulator') + def test_complete_streaming_message_flow(self, mock_stream_cls): """Test complete flow of a valid streaming message.""" # Create complete valid streaming message message_dict = { @@ -636,8 +639,8 @@ def test_complete_streaming_message_flow(self, mock_handle_streaming): ) # Verify successful processing - mock_handle_streaming.assert_called_once() - call_args = mock_handle_streaming.call_args[0] + mock_stream_cls.assert_called_once() + call_args = mock_stream_cls.call_args[0] # Verify all parameters passed correctly assert call_args[0] == message_dict # message data @@ -653,3 +656,4 @@ def test_complete_streaming_message_flow(self, mock_handle_streaming): mock_channel.basic_ack.assert_called_once_with( delivery_tag="streaming_tag" ) + mock_stream_cls.return_value.start.assert_called_once()