Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions agents/matlab/matlab_agent/docs/interactive.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
# Interactive Frame Format
# Interactive Simulation

Frames exchanged during an interactive simulation are YAML documents with the following structure:
An interactive simulation exchanges small data frames with the MATLAB process while it is running. The agent keeps a persistent TCP connection to MATLAB and relays frames received from RabbitMQ to the process. Every response from MATLAB is sent back to the broker so that clients can consume it in real time.

## Frame Format

Each message flowing from the broker to the agent must be a YAML document of the form:

```yaml
simulation:
inputs:
<key>: <value>
```

The `inputs` section contains key/value pairs that are provided to the MATLAB simulation. Each frame sent from the message broker to the agent should conform to this format.
The `inputs` section contains the parameters that will be provided to the MATLAB simulation step. Frames published by MATLAB follow the same structure but typically include additional fields describing the current state or a completion flag.

## Workflow

1. **Start** – When the agent receives an `interactive` simulation request it launches an `InteractiveSimulator` in a dedicated thread. The simulator starts two local TCP servers: one for sending data to MATLAB and one for receiving its outputs.
2. **Connection Setup** – The simulator opens its own `BlockingConnection` to RabbitMQ. A queue named `Q.<agent>.interactive.<request_id>` is declared and bound to the routing key defined in the simulation payload (`inputs.stream_source`).
3. **Handshake** – After starting MATLAB, the agent waits for the MATLAB process to connect to both TCP ports. A simple handshake is performed by sending an empty JSON object.
4. **Main Loop** – The simulator continuously polls the dedicated RabbitMQ connection for new frames. Received inputs are forwarded over the TCP socket to MATLAB. Any JSON messages coming from MATLAB are packaged into responses using `create_response` and published with `send_result`.
5. **Completion** – When MATLAB signals that the simulation is finished (for example by sending a frame with `status: completed`) the simulator publishes the final response, closes the TCP servers and its RabbitMQ connection, and exits.

This separation of connections ensures that no socket is shared between threads and allows the agent to stop the interactive simulation safely if requested.
22 changes: 22 additions & 0 deletions agents/matlab/matlab_agent/docs/threading.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Thread Management in the MATLAB Agent

The agent separates RabbitMQ communication from MATLAB execution using a simple thread model. All RabbitMQ I/O occurs on a single consumer thread while each simulation runs in its own worker thread. This prevents thread‑safety problems in `pika` and allows the agent to interrupt simulations without blocking message handling.

## Consumer (I/O) Thread

`RabbitMQManager.start_consuming()` is called from the main thread of the agent. The method stores the thread that invokes it as the **I/O thread** and starts `pika`'s blocking consumption loop. Because `pika.BlockingConnection` is not thread safe, every publish must happen on this same thread.

`RabbitMQManager.send_message()` checks whether the current thread matches the stored I/O thread. If a worker thread needs to publish a message, the method schedules the publish with `connection.add_callback_threadsafe()`. The callback is then executed on the consumer thread so all RabbitMQ operations are serialized on the connection.

## Simulation Threads

`MessageHandler` creates a concrete implementation of the `MatlabSimulator` interface whenever a new message arrives. The available implementations are `BatchSimulator`, `StreamingSimulator` and `InteractiveSimulator`. Calling `start()` on any simulator launches a new daemon thread and invokes its private `_execute()` method.

The simulator thread performs the MATLAB computation and uses `RabbitMQManager.send_result()` to report progress or final data. Because `send_result()` relies on `send_message()` the actual publish always occurs on the I/O thread.

Interactive simulations also need to read frames from RabbitMQ while running. To avoid sharing the consumer connection across threads the interactive controller opens a dedicated `BlockingConnection` inside its thread. Results are still published through the manager so they follow the same safe path.

## Active Simulator and Stopping

`MessageHandler` stores the currently running simulator instance in `active_simulator`. Only one simulator can run at a time. When a `STOP` command is received the handler calls `active_simulator.stop()` which clears an internal event. Each `_execute()` loop checks this flag and exits when it is cleared. This mechanism provides a cooperative way to terminate simulations without abruptly killing threads.

44 changes: 30 additions & 14 deletions agents/matlab/matlab_agent/src/comm/rabbitmq/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@
from .interfaces import IRabbitMQMessageHandler
from ...utils.logger import get_logger
from ...utils.create_response import create_response
from ...core.batch import handle_batch_simulation
from ...core.streaming import handle_streaming_simulation
from ...core.interactive import handle_interactive_simulation
from ...core.simulator import (
MatlabSimulator,
BatchSimulator,
StreamingSimulator,
InteractiveSimulator,
)
from ...core.batch import handle_batch_simulation # Backwards compatibility
from ...core.streaming import handle_streaming_simulation # Backwards compat
from ...core.interactive import handle_interactive_simulation # Backwards compat
from ...utils.commands import CommandRegistry

logger = get_logger()
Expand Down Expand Up @@ -99,6 +105,7 @@ def __init__(self, agent_id: str, rabbitmq_manager: Any,
self.response_templates = self.config.get(
'response_templates', {})
self.interactive_queues: Dict[str, queue.Queue] = {}
self.active_simulator: Optional[MatlabSimulator] = None

def get_agent_id(self) -> str:
"""
Expand Down Expand Up @@ -145,7 +152,11 @@ def handle_message(
cmd = msg_dict['command'].upper()
if cmd == 'STOP':
# Signal to stop the current simulation
logger.info("Received STOP command, signaling to stop simulation")
logger.info(
"Received STOP command, signaling to stop simulation"
)
if self.active_simulator and self.active_simulator.is_running():
self.active_simulator.stop()
CommandRegistry.stop()
elif cmd == 'RUN':
# Reset the stop event to allow running simulations
Expand Down Expand Up @@ -238,34 +249,39 @@ def handle_message(
logger.info("Received simulation type: %s", sim_type)
# Process based on simulation type
if sim_type == 'batch':
handle_batch_simulation(
self.active_simulator = BatchSimulator(
msg_dict,
source,
self.rabbitmq_manager,
self.path_simulation,
self.response_templates)
self.response_templates,
)
self.active_simulator.start()
ch.basic_ack(delivery_tag=method.delivery_tag)
elif sim_type == 'streaming':
ch.basic_ack(delivery_tag=method.delivery_tag)
tcp_settings = self.config.get(
'tcp', {})
handle_streaming_simulation(
msg_dict, source,
tcp_settings = self.config.get('tcp', {})
self.active_simulator = StreamingSimulator(
msg_dict,
source,
self.rabbitmq_manager,
self.path_simulation,
self.response_templates,
tcp_settings
tcp_settings,
)
self.active_simulator.start()
elif sim_type == 'interactive':
ch.basic_ack(delivery_tag=method.delivery_tag)
tcp_settings = self.config.get('tcp', {})
handle_interactive_simulation(
msg_dict, source,
self.active_simulator = InteractiveSimulator(
msg_dict,
source,
self.rabbitmq_manager,
self.path_simulation,
self.response_templates,
tcp_settings
tcp_settings,
)
self.active_simulator.start()
else:
logger.error("Unknown simulation type: %s", sim_type)
error_response = create_response(
Expand Down
47 changes: 29 additions & 18 deletions agents/matlab/matlab_agent/src/comm/rabbitmq/rabbitmq_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import ssl
import uuid
from typing import Dict, Any, Callable, Optional
import threading

import yaml
import pika
Expand Down Expand Up @@ -37,6 +38,7 @@ def __init__(self, agent_id: str, config: Dict[str, Any]) -> None:
self.config: Dict[str, Any] = config
self.connection: Optional[pika.BlockingConnection] = None
self.channel: Optional[pika.adapters.blocking_connection.BlockingChannel] = None
self._io_thread: Optional[threading.Thread] = None
self.input_queue_name: str = f'Q.sim.{self.agent_id}'
self.message_handler: Optional[Callable[[
pika.adapters.blocking_connection.BlockingChannel,
Expand Down Expand Up @@ -207,6 +209,7 @@ def start_consuming(self) -> None:
return

try:
self._io_thread = threading.current_thread()
self.channel.basic_consume(
queue=self.input_queue_name,
on_message_callback=self.message_handler
Expand Down Expand Up @@ -243,26 +246,34 @@ def send_message(
Returns:
bool: True if successful, False otherwise
"""
try:
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=properties or pika.BasicProperties(
delivery_mode=2 # Persistent message
def _publish() -> bool:
try:
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=properties or pika.BasicProperties(
delivery_mode=2 # Persistent message
)
)
)
logger.debug(
"Sent message to exchange %s with routing key %s",
exchange,
routing_key)
logger.debug(
"Sent message to exchange %s with routing key %s",
exchange,
routing_key)
return True
except pika.exceptions.AMQPError as e:
logger.error("Failed to send message: %s", e)
return False
except Exception as e: # pragma: no cover - unexpected errors
logger.error("Unexpected error: %s", e)
return False

if self._io_thread and threading.current_thread() is not self._io_thread:
assert self.connection is not None
self.connection.add_callback_threadsafe(_publish)
return True
except pika.exceptions.AMQPError as e:
logger.error("Failed to send message: %s", e)
return False
except Exception as e:
logger.error("Unexpected error: %s", e)
return False

return _publish()

def send_result(self, destination: str, result: Dict[str, Any]) -> bool:
"""
Expand Down
88 changes: 65 additions & 23 deletions agents/matlab/matlab_agent/src/core/interactive.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from pathlib import Path
from select import select
from typing import Any, Dict, Optional
import ssl

import pika

import psutil
import yaml
Expand All @@ -24,7 +27,7 @@
DEFAULT_INPUT_PORT,
DEFAULT_OUTPUT_PORT,
MAX_FILENAME_LENGTH,
EXCHANGE_INPUT_STREAM
EXCHANGE_INPUT_STREAM,
)

logger = get_logger()
Expand Down Expand Up @@ -62,8 +65,7 @@ def accept(self) -> None:
self._conn, _ = self._srv.accept()
self._conn.setblocking(False)
else:
logger.error(
"[INTERACTIVE] Timeout waiting for client connection.")
logger.error("[INTERACTIVE] Timeout waiting for client connection.")
raise TimeoutError("No client connection received in time.")

def send(self, data: Dict[str, Any]) -> None:
Expand Down Expand Up @@ -95,7 +97,9 @@ def recv_all(self) -> list[Dict[str, Any]]:
continue
try:
messages.append(json.loads(line.decode()))
except json.JSONDecodeError as exc: # pragma: no cover - logs error and skips invalid message
except (
json.JSONDecodeError
) as exc: # pragma: no cover - logs error and skips invalid message
logger.error("[INTERACTIVE] Invalid JSON: %s", exc)
messages.append({"error": f"Invalid JSON: {str(exc)}"})
# Return the messages list, i.e., all complete decoded JSON objects (plus any error placeholders) available at that moment.
Expand Down Expand Up @@ -179,9 +183,13 @@ def start(self, pm: PerformanceMonitor) -> None:
self.start_time = time.time()
self.out_srv.start()
self.in_srv.start()
logger.debug("[INTERACTIVE] TCP servers started on %s:%s and %s:%s",
self.out_srv.host, self.out_srv.port,
self.in_srv.host, self.in_srv.port)
logger.debug(
"[INTERACTIVE] TCP servers started on %s:%s and %s:%s",
self.out_srv.host,
self.out_srv.port,
self.in_srv.host,
self.in_srv.port,
)
self._start_matlab()
logger.debug("[INTERACTIVE] Waiting for MATLAB to start...")
pm.record_matlab_startup_complete()
Expand Down Expand Up @@ -215,9 +223,7 @@ def _only_inputs(frame: Dict[str, Any]) -> Dict[str, Any]:
if isinstance(frame, dict):
sim = frame.get("simulation")
if isinstance(sim, dict) and "inputs" in sim:
logger.debug(
"[INTERACTIVE] Received inputs: %s",
sim["inputs"])
logger.debug("[INTERACTIVE] Received inputs: %s", sim["inputs"])
return sim["inputs"]
return frame

Expand All @@ -226,32 +232,67 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None:
sim = msg_dict["simulation"]
stream_key = sim["inputs"]["stream_source"].replace("rabbitmq://", "")

ch = self.broker.channel
# Interactive simulations run in their own thread. Create a dedicated
# connection so all RabbitMQ operations occur on this thread and do
# not interfere with the consumer's connection.
rabbitmq_cfg = self.broker.config.get("rabbitmq", {})
credentials = pika.PlainCredentials(
rabbitmq_cfg.get("username", "guest"),
rabbitmq_cfg.get("password", "guest"),
)
vhost = rabbitmq_cfg.get("vhost", "/")
if rabbitmq_cfg.get("tls", False):
context = ssl.create_default_context()
ssl_options = pika.SSLOptions(
context,
rabbitmq_cfg.get("host", "localhost"),
)
parameters = pika.ConnectionParameters(
host=rabbitmq_cfg.get("host", "localhost"),
port=rabbitmq_cfg.get("port", 5671),
virtual_host=vhost,
credentials=credentials,
ssl_options=ssl_options,
heartbeat=rabbitmq_cfg.get("heartbeat", 600),
)
else:
parameters = pika.ConnectionParameters(
host=rabbitmq_cfg.get("host", "localhost"),
port=rabbitmq_cfg.get("port", 5672),
virtual_host=vhost,
credentials=credentials,
heartbeat=rabbitmq_cfg.get("heartbeat", 600),
)
connection = pika.BlockingConnection(parameters)
ch = connection.channel()
qname = f"Q.{self.agent_id}.interactive.{self.request_id}"
ch.exchange_declare(
exchange=EXCHANGE_INPUT_STREAM,
exchange_type="topic",
durable=True)
durable=True,
)
ch.queue_declare(queue=qname, durable=True)
ch.queue_bind(
exchange=EXCHANGE_INPUT_STREAM,
queue=qname,
routing_key=stream_key)
routing_key=stream_key,
)

try:
while True:
if self.out_srv.matlab_proc and self.out_srv.matlab_proc.poll() is not None:
if (
self.out_srv.matlab_proc
and self.out_srv.matlab_proc.poll() is not None
):
logger.debug("[INTERACTIVE] MATLAB process ended, stopping loop")
break
method, _ , body = ch.basic_get(
queue=qname, auto_ack=True)
method, _, body = ch.basic_get(queue=qname, auto_ack=True)
while method:
frame = _parse_frame(body)
if frame:
# Send the inputs to MATLAB
self.in_srv.send(self._only_inputs(frame))
method, _ , body = ch.basic_get(
queue=qname, auto_ack=True)
method, _, body = ch.basic_get(queue=qname, auto_ack=True)

# Receive Responses from MATLAB
for resp in self.out_srv.recv_all():
Expand All @@ -265,6 +306,10 @@ def run(self, pm: PerformanceMonitor, msg_dict: Dict[str, Any]) -> None:
logger.info("[INTERACTIVE] Interrupted by user")
finally:
pm.record_simulation_complete()
try:
connection.close()
except Exception: # pragma: no cover - cleanup errors
pass

def close(self) -> None:
"""Close the TCP servers"""
Expand All @@ -275,8 +320,7 @@ def metadata(self) -> Dict[str, Any]:
meta: Dict[str, Any] = {}
if self.start_time:
meta["execution_time"] = time.time() - self.start_time
meta["memory_usage"] = psutil.Process(
).memory_info().rss // BYTES_IN_MB
meta["memory_usage"] = psutil.Process().memory_info().rss // BYTES_IN_MB
return meta


Expand All @@ -291,9 +335,7 @@ def handle_interactive_simulation(
pm = PerformanceMonitor()
sim = msg_dict["simulation"]
pm.start_operation(sim["request_id"])
logger.debug(
"[INTERACTIVE] Starting interactive simulation: %s",
sim["file"])
logger.debug("[INTERACTIVE] Starting interactive simulation: %s", sim["file"])
controller = MatlabInteractiveController(
path_simulation or sim.get("path"),
sim["file"],
Expand Down
Loading
Loading