From cbbdd66c6ad2416419e351f22af31f236a8109d4 Mon Sep 17 00:00:00 2001 From: Marco Melloni <98281551+marcomelloni@users.noreply.github.com> Date: Thu, 24 Jul 2025 11:53:59 +0200 Subject: [PATCH 1/5] Add MatlabSimulator interface and integrate with message handler --- .../src/comm/rabbitmq/message_handler.py | 44 ++++-- .../matlab/matlab_agent/src/core/simulator.py | 129 ++++++++++++++++++ 2 files changed, 159 insertions(+), 14 deletions(-) create mode 100644 agents/matlab/matlab_agent/src/core/simulator.py diff --git a/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py b/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py index cdebb776..efdb4613 100644 --- a/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py +++ b/agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py @@ -15,9 +15,15 @@ from .interfaces import IRabbitMQMessageHandler from ...utils.logger import get_logger from ...utils.create_response import create_response -from ...core.batch import handle_batch_simulation -from ...core.streaming import handle_streaming_simulation -from ...core.interactive import handle_interactive_simulation +from ...core.simulator import ( + MatlabSimulator, + BatchSimulator, + StreamingSimulator, + InteractiveSimulator, +) +from ...core.batch import handle_batch_simulation # Backwards compatibility +from ...core.streaming import handle_streaming_simulation # Backwards compat +from ...core.interactive import handle_interactive_simulation # Backwards compat from ...utils.commands import CommandRegistry logger = get_logger() @@ -99,6 +105,7 @@ def __init__(self, agent_id: str, rabbitmq_manager: Any, self.response_templates = self.config.get( 'response_templates', {}) self.interactive_queues: Dict[str, queue.Queue] = {} + self.active_simulator: Optional[MatlabSimulator] = None def get_agent_id(self) -> str: """ @@ -145,7 +152,11 @@ def handle_message( cmd = msg_dict['command'].upper() if cmd == 'STOP': # Signal to stop the current simulation - logger.info("Received STOP command, signaling to stop simulation") + logger.info( + "Received STOP command, signaling to stop simulation" + ) + if self.active_simulator and self.active_simulator.is_running(): + self.active_simulator.stop() CommandRegistry.stop() elif cmd == 'RUN': # Reset the stop event to allow running simulations @@ -238,34 +249,39 @@ def handle_message( logger.info("Received simulation type: %s", sim_type) # Process based on simulation type if sim_type == 'batch': - handle_batch_simulation( + self.active_simulator = BatchSimulator( msg_dict, source, self.rabbitmq_manager, self.path_simulation, - self.response_templates) + self.response_templates, + ) + self.active_simulator.start() ch.basic_ack(delivery_tag=method.delivery_tag) elif sim_type == 'streaming': ch.basic_ack(delivery_tag=method.delivery_tag) - tcp_settings = self.config.get( - 'tcp', {}) - handle_streaming_simulation( - msg_dict, source, + tcp_settings = self.config.get('tcp', {}) + self.active_simulator = StreamingSimulator( + msg_dict, + source, self.rabbitmq_manager, self.path_simulation, self.response_templates, - tcp_settings + tcp_settings, ) + self.active_simulator.start() elif sim_type == 'interactive': ch.basic_ack(delivery_tag=method.delivery_tag) tcp_settings = self.config.get('tcp', {}) - handle_interactive_simulation( - msg_dict, source, + self.active_simulator = InteractiveSimulator( + msg_dict, + source, self.rabbitmq_manager, self.path_simulation, self.response_templates, - tcp_settings + tcp_settings, ) + self.active_simulator.start() else: logger.error("Unknown simulation type: %s", sim_type) error_response = create_response( diff --git a/agents/matlab/matlab_agent/src/core/simulator.py b/agents/matlab/matlab_agent/src/core/simulator.py new file mode 100644 index 00000000..28fef948 --- /dev/null +++ b/agents/matlab/matlab_agent/src/core/simulator.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +"""Unified MATLAB simulation interface and implementations.""" + +from abc import ABC, abstractmethod +import threading +from typing import Any, Dict, Optional + +from ..comm.interfaces import IMessageBroker +from .batch import handle_batch_simulation +from .streaming import handle_streaming_simulation +from .interactive import handle_interactive_simulation + + +class MatlabSimulator(ABC): + """Abstract base class for MATLAB simulations.""" + + def __init__(self) -> None: + self._thread: Optional[threading.Thread] = None + self._running = threading.Event() + + def start(self) -> None: + """Start the simulation in a separate thread.""" + if self._running.is_set(): + return + self._running.set() + self._thread = threading.Thread(target=self._execute, daemon=True) + self._thread.start() + + def stop(self) -> None: + """Signal the simulation to stop.""" + self._running.clear() + + def is_running(self) -> bool: + """Check if the simulation thread is active.""" + return self._running.is_set() + + @abstractmethod + def _execute(self) -> None: + """Execute the simulation logic.""" + + +class BatchSimulator(MatlabSimulator): + """Run MATLAB batch simulations.""" + + def __init__( + self, + msg_dict: Dict[str, Any], + source: str, + broker: IMessageBroker, + path_simulation: str, + response_templates: Dict[str, Any], + ) -> None: + super().__init__() + self.msg_dict = msg_dict + self.source = source + self.broker = broker + self.path = path_simulation + self.templates = response_templates + + def _execute(self) -> None: # pragma: no cover - thin wrapper + handle_batch_simulation( + self.msg_dict, self.source, self.broker, self.path, self.templates + ) + self.stop() + + +class StreamingSimulator(MatlabSimulator): + """Run MATLAB streaming simulations.""" + + def __init__( + self, + msg_dict: Dict[str, Any], + source: str, + broker: IMessageBroker, + path_simulation: str, + response_templates: Dict[str, Any], + tcp_settings: Dict[str, Any], + ) -> None: + super().__init__() + self.msg_dict = msg_dict + self.source = source + self.broker = broker + self.path = path_simulation + self.templates = response_templates + self.tcp = tcp_settings + + def _execute(self) -> None: # pragma: no cover - thin wrapper + handle_streaming_simulation( + self.msg_dict, + self.source, + self.broker, + self.path, + self.templates, + self.tcp, + ) + self.stop() + + +class InteractiveSimulator(MatlabSimulator): + """Run MATLAB interactive simulations.""" + + def __init__( + self, + msg_dict: Dict[str, Any], + source: str, + broker: IMessageBroker, + path_simulation: str, + response_templates: Dict[str, Any], + tcp_settings: Dict[str, Any], + ) -> None: + super().__init__() + self.msg_dict = msg_dict + self.source = source + self.broker = broker + self.path = path_simulation + self.templates = response_templates + self.tcp = tcp_settings + + def _execute(self) -> None: # pragma: no cover - thin wrapper + handle_interactive_simulation( + self.msg_dict, + self.source, + self.broker, + self.path, + self.templates, + self.tcp, + ) + self.stop() From cb6b21737708b23bcfbf1d3482a75bdf07ccad61 Mon Sep 17 00:00:00 2001 From: Marco Melloni <98281551+marcomelloni@users.noreply.github.com> Date: Thu, 24 Jul 2025 12:07:21 +0200 Subject: [PATCH 2/5] Fix RabbitMQ thread safety --- .../src/comm/rabbitmq/rabbitmq_manager.py | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/agents/matlab/matlab_agent/src/comm/rabbitmq/rabbitmq_manager.py b/agents/matlab/matlab_agent/src/comm/rabbitmq/rabbitmq_manager.py index 23bb5fa3..4d94b726 100644 --- a/agents/matlab/matlab_agent/src/comm/rabbitmq/rabbitmq_manager.py +++ b/agents/matlab/matlab_agent/src/comm/rabbitmq/rabbitmq_manager.py @@ -7,6 +7,7 @@ import ssl import uuid from typing import Dict, Any, Callable, Optional +import threading import yaml import pika @@ -37,6 +38,7 @@ def __init__(self, agent_id: str, config: Dict[str, Any]) -> None: self.config: Dict[str, Any] = config self.connection: Optional[pika.BlockingConnection] = None self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None + self._io_thread: Optional[threading.Thread] = None self.input_queue_name: str = f'Q.sim.{self.agent_id}' self.message_handler: Optional[Callable[[ pika.adapters.blocking_connection.BlockingChannel, @@ -207,6 +209,7 @@ def start_consuming(self) -> None: return try: + self._io_thread = threading.current_thread() self.channel.basic_consume( queue=self.input_queue_name, on_message_callback=self.message_handler @@ -243,26 +246,34 @@ def send_message( Returns: bool: True if successful, False otherwise """ - try: - self.channel.basic_publish( - exchange=exchange, - routing_key=routing_key, - body=body, - properties=properties or pika.BasicProperties( - delivery_mode=2 # Persistent message + def _publish() -> bool: + try: + self.channel.basic_publish( + exchange=exchange, + routing_key=routing_key, + body=body, + properties=properties or pika.BasicProperties( + delivery_mode=2 # Persistent message + ) ) - ) - logger.debug( - "Sent message to exchange %s with routing key %s", - exchange, - routing_key) + logger.debug( + "Sent message to exchange %s with routing key %s", + exchange, + routing_key) + return True + except pika.exceptions.AMQPError as e: + logger.error("Failed to send message: %s", e) + return False + except Exception as e: # pragma: no cover - unexpected errors + logger.error("Unexpected error: %s", e) + return False + + if self._io_thread and threading.current_thread() is not self._io_thread: + assert self.connection is not None + self.connection.add_callback_threadsafe(_publish) return True - except pika.exceptions.AMQPError as e: - logger.error("Failed to send message: %s", e) - return False - except Exception as e: - logger.error("Unexpected error: %s", e) - return False + + return _publish() def send_result(self, destination: str, result: Dict[str, Any]) -> bool: """ From 52097c91608a15ba2c9b7d60a436d8656ead912d Mon Sep 17 00:00:00 2001 From: Marco Melloni <98281551+marcomelloni@users.noreply.github.com> Date: Thu, 24 Jul 2025 13:59:55 +0200 Subject: [PATCH 3/5] Update tests for simulator classes and add threading docs --- agents/matlab/matlab_agent/docs/threading.md | 21 ++++++++++ .../test/unit/test_message_handler.py | 40 ++++++++++--------- 2 files changed, 43 insertions(+), 18 deletions(-) create mode 100644 agents/matlab/matlab_agent/docs/threading.md diff --git a/agents/matlab/matlab_agent/docs/threading.md b/agents/matlab/matlab_agent/docs/threading.md new file mode 100644 index 00000000..01fa107a --- /dev/null +++ b/agents/matlab/matlab_agent/docs/threading.md @@ -0,0 +1,21 @@ +# Thread Management in the MATLAB Agent + +The MATLAB agent relies on a dedicated thread to handle RabbitMQ I/O while simulations run in their own worker threads. This separation avoids thread safety issues with `pika` and allows the agent to interrupt simulations cleanly. + +## Consumer Thread + +When `RabbitMQManager.start_consuming` is invoked, the calling thread becomes the **I/O thread**. Every message received from RabbitMQ is processed on this thread. Because `pika.BlockingConnection` is not thread safe, any publish operation must occur on this same thread. + +`RabbitMQManager.send_message` checks the current thread against the stored I/O thread. If a different thread needs to publish a message (for example from a running simulation), the method schedules the publish with `connection.add_callback_threadsafe`. This guarantees that all communication with RabbitMQ happens on the consumer thread. + +## Simulation Threads + +`MessageHandler` creates an instance of `BatchSimulator`, `StreamingSimulator` or `InteractiveSimulator` depending on the message type. Each simulator implements the common `MatlabSimulator` interface. Calling `start()` launches a new thread where the simulator executes its `_execute` logic. + +The simulator thread performs the MATLAB work and uses the broker to publish results. Because `send_message` uses the callback mechanism described above, these results are sent safely through the consumer thread. + +The handler stores a reference to the active simulator. It can call `stop()` in response to a `STOP` command to signal the thread to terminate. Only one simulator is active at a time, simplifying resource management. + +## Stopping + +A simulator sets an internal event when running. Calling `stop()` clears this event. The `_execute` loop in each implementation checks this flag and exits when it is cleared, ensuring that MATLAB executions can be interrupted without forcing the thread to stop abruptly. diff --git a/agents/matlab/matlab_agent/test/unit/test_message_handler.py b/agents/matlab/matlab_agent/test/unit/test_message_handler.py index f3893e7b..347a1968 100644 --- a/agents/matlab/matlab_agent/test/unit/test_message_handler.py +++ b/agents/matlab/matlab_agent/test/unit/test_message_handler.py @@ -145,10 +145,10 @@ def test_get_agent_id_returns_correct_id(self): assert self.handler.get_agent_id() == self.agent_id @patch('src.comm.rabbitmq.message_handler.yaml.safe_load') - @patch('src.comm.rabbitmq.message_handler.handle_batch_simulation') + @patch('src.comm.rabbitmq.message_handler.BatchSimulator') @patch('src.comm.rabbitmq.message_handler.create_response') def test_handle_message_batch_simulation_success( - self, mock_create_response, mock_handle_batch, mock_yaml_load + self, mock_create_response, mock_batch_cls, mock_yaml_load ): """Test successful handling of batch simulation message.""" # Setup valid message data @@ -175,22 +175,23 @@ def test_handle_message_batch_simulation_success( # Verify mock_yaml_load.assert_called_once_with(b'test message body') - mock_handle_batch.assert_called_once_with( + mock_batch_cls.assert_called_once_with( message_data, 'source', self.mock_rabbitmq_manager, '/test/path', {'error': 'error_template'} ) + mock_batch_cls.return_value.start.assert_called_once() self.mock_channel.basic_ack.assert_called_once_with( delivery_tag="test_tag" ) @patch('src.comm.rabbitmq.message_handler.yaml.safe_load') - @patch('src.comm.rabbitmq.message_handler.handle_streaming_simulation') + @patch('src.comm.rabbitmq.message_handler.StreamingSimulator') @patch('src.comm.rabbitmq.message_handler.create_response') def test_handle_message_streaming_simulation_success( - self, mock_create_response, mock_handle_streaming, mock_yaml_load + self, mock_create_response, mock_stream_cls, mock_yaml_load ): """Test successful handling of streaming simulation message.""" # Setup valid message data @@ -217,7 +218,7 @@ def test_handle_message_streaming_simulation_success( # Verify mock_yaml_load.assert_called_once_with(b'test message body') - mock_handle_streaming.assert_called_once_with( + mock_stream_cls.assert_called_once_with( message_data, 'source', self.mock_rabbitmq_manager, @@ -225,6 +226,7 @@ def test_handle_message_streaming_simulation_success( {'error': 'error_template'}, {'port': 8080} ) + mock_stream_cls.return_value.start.assert_called_once() self.mock_channel.basic_ack.assert_called_once_with( delivery_tag="test_tag" ) @@ -495,7 +497,7 @@ def test_routing_key_extraction(self, mock_yaml_load): self.mock_method.routing_key = routing_key expected_source = routing_key.split('.')[0] - with patch('src.comm.rabbitmq.message_handler.handle_batch_simulation') as mock_batch: + with patch('src.comm.rabbitmq.message_handler.BatchSimulator') as mock_batch_cls: self.handler.handle_message( self.mock_channel, self.mock_method, @@ -504,11 +506,11 @@ def test_routing_key_extraction(self, mock_yaml_load): ) # Verify source is correctly extracted - mock_batch.assert_called_once() - call_args = mock_batch.call_args[0] + mock_batch_cls.assert_called_once() + call_args = mock_batch_cls.call_args[0] assert call_args[1] == expected_source # source parameter - mock_batch.reset_mock() + mock_batch_cls.reset_mock() # Integration test class for end-to-end testing @@ -530,8 +532,8 @@ def setup_method(self): self.config ) - @patch('src.comm.rabbitmq.message_handler.handle_batch_simulation') - def test_complete_batch_message_flow(self, mock_handle_batch): + @patch('src.comm.rabbitmq.message_handler.BatchSimulator') + def test_complete_batch_message_flow(self, mock_batch_cls): """Test complete flow of a valid batch message.""" # Create complete valid message message_dict = { @@ -578,8 +580,8 @@ def test_complete_batch_message_flow(self, mock_handle_batch): ) # Verify successful processing - mock_handle_batch.assert_called_once() - call_args = mock_handle_batch.call_args[0] + mock_batch_cls.assert_called_once() + call_args = mock_batch_cls.call_args[0] # Verify all parameters passed correctly assert call_args[0] == message_dict # message data @@ -592,9 +594,10 @@ def test_complete_batch_message_flow(self, mock_handle_batch): mock_channel.basic_ack.assert_called_once_with( delivery_tag="integration_tag" ) + mock_batch_cls.return_value.start.assert_called_once() - @patch('src.comm.rabbitmq.message_handler.handle_streaming_simulation') - def test_complete_streaming_message_flow(self, mock_handle_streaming): + @patch('src.comm.rabbitmq.message_handler.StreamingSimulator') + def test_complete_streaming_message_flow(self, mock_stream_cls): """Test complete flow of a valid streaming message.""" # Create complete valid streaming message message_dict = { @@ -636,8 +639,8 @@ def test_complete_streaming_message_flow(self, mock_handle_streaming): ) # Verify successful processing - mock_handle_streaming.assert_called_once() - call_args = mock_handle_streaming.call_args[0] + mock_stream_cls.assert_called_once() + call_args = mock_stream_cls.call_args[0] # Verify all parameters passed correctly assert call_args[0] == message_dict # message data @@ -653,3 +656,4 @@ def test_complete_streaming_message_flow(self, mock_handle_streaming): mock_channel.basic_ack.assert_called_once_with( delivery_tag="streaming_tag" ) + mock_stream_cls.return_value.start.assert_called_once() From cf97e651ce5c78ad6220bb8b31734cd005c30c46 Mon Sep 17 00:00:00 2001 From: Marco Melloni <98281551+marcomelloni@users.noreply.github.com> Date: Thu, 24 Jul 2025 14:10:01 +0200 Subject: [PATCH 4/5] fix interactive simulation threading --- agents/matlab/matlab_agent/docs/threading.md | 6 ++ .../matlab_agent/src/core/interactive.py | 88 ++++++++++++++----- 2 files changed, 71 insertions(+), 23 deletions(-) diff --git a/agents/matlab/matlab_agent/docs/threading.md b/agents/matlab/matlab_agent/docs/threading.md index 01fa107a..d9588f47 100644 --- a/agents/matlab/matlab_agent/docs/threading.md +++ b/agents/matlab/matlab_agent/docs/threading.md @@ -14,6 +14,12 @@ When `RabbitMQManager.start_consuming` is invoked, the calling thread becomes th The simulator thread performs the MATLAB work and uses the broker to publish results. Because `send_message` uses the callback mechanism described above, these results are sent safely through the consumer thread. +Interactive simulations need to read frames from RabbitMQ while the consumer is +busy. To avoid sharing the main connection across threads, the interactive +controller opens its own `BlockingConnection` which is used exclusively by that +thread. Results are still published through the broker so they use the callback +mechanism. + The handler stores a reference to the active simulator. It can call `stop()` in response to a `STOP` command to signal the thread to terminate. Only one simulator is active at a time, simplifying resource management. ## Stopping diff --git a/agents/matlab/matlab_agent/src/core/interactive.py b/agents/matlab/matlab_agent/src/core/interactive.py index 954b3f87..4968ff02 100644 --- a/agents/matlab/matlab_agent/src/core/interactive.py +++ b/agents/matlab/matlab_agent/src/core/interactive.py @@ -9,6 +9,9 @@ from pathlib import Path from select import select from typing import Any, Dict, Optional +import ssl + +import pika import psutil import yaml @@ -24,7 +27,7 @@ DEFAULT_INPUT_PORT, DEFAULT_OUTPUT_PORT, MAX_FILENAME_LENGTH, - EXCHANGE_INPUT_STREAM + EXCHANGE_INPUT_STREAM, ) logger = get_logger() @@ -62,8 +65,7 @@ def accept(self) -> None: self._conn, _ = self._srv.accept() self._conn.setblocking(False) else: - logger.error( - "[INTERACTIVE] Timeout waiting for client connection.") + logger.error("[INTERACTIVE] Timeout waiting for client connection.") raise TimeoutError("No client connection received in time.") def send(self, data: Dict[str, Any]) -> None: @@ -95,7 +97,9 @@ def recv_all(self) -> list[Dict[str, Any]]: continue try: messages.append(json.loads(line.decode())) - except json.JSONDecodeError as exc: # pragma: no cover - logs error and skips invalid message + except ( + json.JSONDecodeError + ) as exc: # pragma: no cover - logs error and skips invalid message logger.error("[INTERACTIVE] Invalid JSON: %s", exc) messages.append({"error": f"Invalid JSON: {str(exc)}"}) # Return the messages list, i.e., all complete decoded JSON objects (plus any error placeholders) available at that moment. @@ -179,9 +183,13 @@ def start(self, pm: PerformanceMonitor) -> None: self.start_time = time.time() self.out_srv.start() self.in_srv.start() - logger.debug("[INTERACTIVE] TCP servers started on %s:%s and %s:%s", - self.out_srv.host, self.out_srv.port, - self.in_srv.host, self.in_srv.port) + logger.debug( + "[INTERACTIVE] TCP servers started on %s:%s and %s:%s", + self.out_srv.host, + self.out_srv.port, + self.in_srv.host, + self.in_srv.port, + ) self._start_matlab() logger.debug("[INTERACTIVE] Waiting for MATLAB to start...") pm.record_matlab_startup_complete() @@ -215,9 +223,7 @@ def _only_inputs(frame: Dict[str, Any]) -> Dict[str, Any]: if isinstance(frame, dict): sim = frame.get("simulation") if isinstance(sim, dict) and "inputs" in sim: - logger.debug( - "[INTERACTIVE] Received inputs: %s", - sim["inputs"]) + logger.debug("[INTERACTIVE] Received inputs: %s", sim["inputs"]) return sim["inputs"] return frame @@ -226,32 +232,67 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None: sim = msg_dict["simulation"] stream_key = sim["inputs"]["stream_source"].replace("rabbitmq://", "") - ch = self.broker.channel + # Interactive simulations run in their own thread. Create a dedicated + # connection so all RabbitMQ operations occur on this thread and do + # not interfere with the consumer's connection. + rabbitmq_cfg = self.broker.config.get("rabbitmq", {}) + credentials = pika.PlainCredentials( + rabbitmq_cfg.get("username", "guest"), + rabbitmq_cfg.get("password", "guest"), + ) + vhost = rabbitmq_cfg.get("vhost", "/") + if rabbitmq_cfg.get("tls", False): + context = ssl.create_default_context() + ssl_options = pika.SSLOptions( + context, + rabbitmq_cfg.get("host", "localhost"), + ) + parameters = pika.ConnectionParameters( + host=rabbitmq_cfg.get("host", "localhost"), + port=rabbitmq_cfg.get("port", 5671), + virtual_host=vhost, + credentials=credentials, + ssl_options=ssl_options, + heartbeat=rabbitmq_cfg.get("heartbeat", 600), + ) + else: + parameters = pika.ConnectionParameters( + host=rabbitmq_cfg.get("host", "localhost"), + port=rabbitmq_cfg.get("port", 5672), + virtual_host=vhost, + credentials=credentials, + heartbeat=rabbitmq_cfg.get("heartbeat", 600), + ) + connection = pika.BlockingConnection(parameters) + ch = connection.channel() qname = f"Q.{self.agent_id}.interactive.{self.request_id}" ch.exchange_declare( exchange=EXCHANGE_INPUT_STREAM, exchange_type="topic", - durable=True) + durable=True, + ) ch.queue_declare(queue=qname, durable=True) ch.queue_bind( exchange=EXCHANGE_INPUT_STREAM, queue=qname, - routing_key=stream_key) + routing_key=stream_key, + ) try: while True: - if self.out_srv.matlab_proc and self.out_srv.matlab_proc.poll() is not None: + if ( + self.out_srv.matlab_proc + and self.out_srv.matlab_proc.poll() is not None + ): logger.debug("[INTERACTIVE] MATLAB process ended, stopping loop") break - method, _ , body = ch.basic_get( - queue=qname, auto_ack=True) + method, _, body = ch.basic_get(queue=qname, auto_ack=True) while method: frame = _parse_frame(body) if frame: # Send the inputs to MATLAB self.in_srv.send(self._only_inputs(frame)) - method, _ , body = ch.basic_get( - queue=qname, auto_ack=True) + method, _, body = ch.basic_get(queue=qname, auto_ack=True) # Receive Responses from MATLAB for resp in self.out_srv.recv_all(): @@ -265,6 +306,10 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None: logger.info("[INTERACTIVE] Interrupted by user") finally: pm.record_simulation_complete() + try: + connection.close() + except Exception: # pragma: no cover - cleanup errors + pass def close(self) -> None: """Close the TCP servers""" @@ -275,8 +320,7 @@ def metadata(self) -> Dict[str, Any]: meta: Dict[str, Any] = {} if self.start_time: meta["execution_time"] = time.time() - self.start_time - meta["memory_usage"] = psutil.Process( - ).memory_info().rss // BYTES_IN_MB + meta["memory_usage"] = psutil.Process().memory_info().rss // BYTES_IN_MB return meta @@ -291,9 +335,7 @@ def handle_interactive_simulation( pm = PerformanceMonitor() sim = msg_dict["simulation"] pm.start_operation(sim["request_id"]) - logger.debug( - "[INTERACTIVE] Starting interactive simulation: %s", - sim["file"]) + logger.debug("[INTERACTIVE] Starting interactive simulation: %s", sim["file"]) controller = MatlabInteractiveController( path_simulation or sim.get("path"), sim["file"], From eb40d90ea72429452a8cdecbcdfe28118b0a0c27 Mon Sep 17 00:00:00 2001 From: Marco Melloni <98281551+marcomelloni@users.noreply.github.com> Date: Thu, 24 Jul 2025 14:24:05 +0200 Subject: [PATCH 5/5] docs: clarify thread management --- .../matlab/matlab_agent/docs/interactive.md | 20 +++++++++++++--- agents/matlab/matlab_agent/docs/threading.md | 23 ++++++++----------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/agents/matlab/matlab_agent/docs/interactive.md b/agents/matlab/matlab_agent/docs/interactive.md index c1bea9f0..a3b2f9ef 100644 --- a/agents/matlab/matlab_agent/docs/interactive.md +++ b/agents/matlab/matlab_agent/docs/interactive.md @@ -1,6 +1,10 @@ -# Interactive Frame Format +# Interactive Simulation -Frames exchanged during an interactive simulation are YAML documents with the following structure: +An interactive simulation exchanges small data frames with the MATLAB process while it is running. The agent keeps a persistent TCP connection to MATLAB and relays frames received from RabbitMQ to the process. Every response from MATLAB is sent back to the broker so that clients can consume it in real time. + +## Frame Format + +Each message flowing from the broker to the agent must be a YAML document of the form: ```yaml simulation: @@ -8,4 +12,14 @@ simulation: : ``` -The `inputs` section contains key/value pairs that are provided to the MATLAB simulation. Each frame sent from the message broker to the agent should conform to this format. +The `inputs` section contains the parameters that will be provided to the MATLAB simulation step. Frames published by MATLAB follow the same structure but typically include additional fields describing the current state or a completion flag. + +## Workflow + +1. **Start** – When the agent receives an `interactive` simulation request it launches an `InteractiveSimulator` in a dedicated thread. The simulator starts two local TCP servers: one for sending data to MATLAB and one for receiving its outputs. +2. **Connection Setup** – The simulator opens its own `BlockingConnection` to RabbitMQ. A queue named `Q..interactive.` is declared and bound to the routing key defined in the simulation payload (`inputs.stream_source`). +3. **Handshake** – After starting MATLAB, the agent waits for the MATLAB process to connect to both TCP ports. A simple handshake is performed by sending an empty JSON object. +4. **Main Loop** – The simulator continuously polls the dedicated RabbitMQ connection for new frames. Received inputs are forwarded over the TCP socket to MATLAB. Any JSON messages coming from MATLAB are packaged into responses using `create_response` and published with `send_result`. +5. **Completion** – When MATLAB signals that the simulation is finished (for example by sending a frame with `status: completed`) the simulator publishes the final response, closes the TCP servers and its RabbitMQ connection, and exits. + +This separation of connections ensures that no socket is shared between threads and allows the agent to stop the interactive simulation safely if requested. diff --git a/agents/matlab/matlab_agent/docs/threading.md b/agents/matlab/matlab_agent/docs/threading.md index d9588f47..ea4f2e03 100644 --- a/agents/matlab/matlab_agent/docs/threading.md +++ b/agents/matlab/matlab_agent/docs/threading.md @@ -1,27 +1,22 @@ # Thread Management in the MATLAB Agent -The MATLAB agent relies on a dedicated thread to handle RabbitMQ I/O while simulations run in their own worker threads. This separation avoids thread safety issues with `pika` and allows the agent to interrupt simulations cleanly. +The agent separates RabbitMQ communication from MATLAB execution using a simple thread model. All RabbitMQ I/O occurs on a single consumer thread while each simulation runs in its own worker thread. This prevents thread‑safety problems in `pika` and allows the agent to interrupt simulations without blocking message handling. -## Consumer Thread +## Consumer (I/O) Thread -When `RabbitMQManager.start_consuming` is invoked, the calling thread becomes the **I/O thread**. Every message received from RabbitMQ is processed on this thread. Because `pika.BlockingConnection` is not thread safe, any publish operation must occur on this same thread. +`RabbitMQManager.start_consuming()` is called from the main thread of the agent. The method stores the thread that invokes it as the **I/O thread** and starts `pika`'s blocking consumption loop. Because `pika.BlockingConnection` is not thread safe, every publish must happen on this same thread. -`RabbitMQManager.send_message` checks the current thread against the stored I/O thread. If a different thread needs to publish a message (for example from a running simulation), the method schedules the publish with `connection.add_callback_threadsafe`. This guarantees that all communication with RabbitMQ happens on the consumer thread. +`RabbitMQManager.send_message()` checks whether the current thread matches the stored I/O thread. If a worker thread needs to publish a message, the method schedules the publish with `connection.add_callback_threadsafe()`. The callback is then executed on the consumer thread so all RabbitMQ operations are serialized on the connection. ## Simulation Threads -`MessageHandler` creates an instance of `BatchSimulator`, `StreamingSimulator` or `InteractiveSimulator` depending on the message type. Each simulator implements the common `MatlabSimulator` interface. Calling `start()` launches a new thread where the simulator executes its `_execute` logic. +`MessageHandler` creates a concrete implementation of the `MatlabSimulator` interface whenever a new message arrives. The available implementations are `BatchSimulator`, `StreamingSimulator` and `InteractiveSimulator`. Calling `start()` on any simulator launches a new daemon thread and invokes its private `_execute()` method. -The simulator thread performs the MATLAB work and uses the broker to publish results. Because `send_message` uses the callback mechanism described above, these results are sent safely through the consumer thread. +The simulator thread performs the MATLAB computation and uses `RabbitMQManager.send_result()` to report progress or final data. Because `send_result()` relies on `send_message()` the actual publish always occurs on the I/O thread. -Interactive simulations need to read frames from RabbitMQ while the consumer is -busy. To avoid sharing the main connection across threads, the interactive -controller opens its own `BlockingConnection` which is used exclusively by that -thread. Results are still published through the broker so they use the callback -mechanism. +Interactive simulations also need to read frames from RabbitMQ while running. To avoid sharing the consumer connection across threads the interactive controller opens a dedicated `BlockingConnection` inside its thread. Results are still published through the manager so they follow the same safe path. -The handler stores a reference to the active simulator. It can call `stop()` in response to a `STOP` command to signal the thread to terminate. Only one simulator is active at a time, simplifying resource management. +## Active Simulator and Stopping -## Stopping +`MessageHandler` stores the currently running simulator instance in `active_simulator`. Only one simulator can run at a time. When a `STOP` command is received the handler calls `active_simulator.stop()` which clears an internal event. Each `_execute()` loop checks this flag and exits when it is cleared. This mechanism provides a cooperative way to terminate simulations without abruptly killing threads. -A simulator sets an internal event when running. Calling `stop()` clears this event. The `_execute` loop in each implementation checks this flag and exits when it is cleared, ensuring that MATLAB executions can be interrupted without forcing the thread to stop abruptly.