From 47c3a25f68e1739cd3b90afb2c9d41906ab2079c Mon Sep 17 00:00:00 2001 From: Marco Melloni <98281551+marcomelloni@users.noreply.github.com> Date: Fri, 11 Jul 2025 09:21:31 +0200 Subject: [PATCH] Add client info to performance CSV --- simulation_bridge/docs/performance.md | 39 ++++++++++--------- .../protocol_adapters/mqtt/mqtt_adapter.py | 8 +++- .../rabbitmq/rabbitmq_adapter.py | 9 ++++- .../protocol_adapters/rest/rest_adapter.py | 8 +++- .../src/utils/performance_monitor.py | 20 ++++++++-- .../test/unit/test_performance_monitor.py | 18 ++++++--- 6 files changed, 73 insertions(+), 29 deletions(-) diff --git a/simulation_bridge/docs/performance.md b/simulation_bridge/docs/performance.md index 813319f1..34c1e36e 100644 --- a/simulation_bridge/docs/performance.md +++ b/simulation_bridge/docs/performance.md @@ -18,25 +18,28 @@ A new row is appended to the CSV each time the bridge processes a client request ## CSV Schema - Column Descriptions -| Column | Type | Description | -| ------------------------------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | -| **Operation ID** | String (UUID) | Unique identifier for each request. Used for cross-referencing logs and traces. | -| **Timestamp** | Float (epoch seconds) | Time when `start_operation()` is called. Serves as the reference point for all time deltas. | -| **Request Received Time** | Float (seconds) | When the bridge's event loop first registers the incoming API call. | -| **Core Received Input Time** | Float (seconds) | The moment when the simulation core acknowledged and buffered the input payload (i.e., when it consumed the data from the bridge). The difference between `Core Received Input Time` and `Request Received Time` quantifies the signal overhead. | -| **Core Sent Input Time** | Float (seconds) | The point when the bridge finished delivering the prepared input to the simulation core. | -| **Number of Results** | Integer | Count of partial results produced by the core (`len(Result Times)`). | -| **Result Times** | List | Semicolon-separated timestamps when each partial result arrives at the bridge. | -| **Result Sent Times** | List | Timestamps when results are sent from the bridge to the client. Same length as Result Times. | -| **Simulation Request Completed Time** | Float (seconds) | When the final byte of the last response is sent. Represents the true end-to-end completion. | -| **CPU Percent** | Float (%) | Instantaneous CPU usage of the bridge process when the row is written. Useful for identifying performance spikes. | -| **Memory RSS (MB)** | Float (MiB) | Resident memory usage of the bridge process at the time of measurement. | -| **Total Duration** | Float (seconds) | `Simulation Request Completed Time - Request Received Time`. The total latency perceived by the client. | -| **Average Result Interval** | Float (seconds) | Average time between successive results in Result Times (0 if fewer than 2 results). Indicates streaming frequency. | -| **Input Overhead** | Float (seconds) | `Core Sent Input Time - Request Received Time`. Time spent in the bridge before core processing begins. | -| **Output Overheads** | List | For each result i: `Result Sent Times[i] - Result Times[i]`. Serialization and transport delay per result chunk. | -| **Total Overheads** | List | `Input Overhead + Output Overheads[i]`. Complete bridge processing time contribution per result chunk. | +| Column | Type | Description | +| ------------------------------------- | --------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| **Timestamp** | Float (epoch seconds) | Time when `start_operation()` is called. Serves as the reference point for all time deltas. | +| **Client ID** | String | Identifier of the requesting client. | +| **Client Protocol** | String | Protocol used to communicate with the bridge (e.g., REST, MQTT). | +| **Operation ID** | String (UUID) | Unique identifier for each request. Used for cross-referencing logs and traces. | +| **Simulation Type** | String | Type of simulation requested (`batch`, `streaming`, or `interactive`). | +| **Request Received Time** | Float (seconds) | When the bridge's event loop first registers the incoming API call. | +| **Core Received Input Time** | Float (seconds) | The moment when the simulation core acknowledged and buffered the input payload (i.e., when it consumed the data from the bridge). The difference between `Core Received Input Time` and `Request Received Time` quantifies the signal overhead. | +| **Core Sent Input Time** | Float (seconds) | The point when the bridge finished delivering the prepared input to the simulation core. | +| **Number of Results** | Integer | Count of partial results produced by the core (`len(Result Times)`). | +| **Result Times** | List | Semicolon-separated timestamps when each partial result arrives at the bridge. | +| **Result Sent Times** | List | Timestamps when results are sent from the bridge to the client. Same length as Result Times. | +| **Simulation Request Completed Time** | Float (seconds) | When the final byte of the last response is sent. Represents the true end-to-end completion. | +| **CPU Percent** | Float (%) | Instantaneous CPU usage of the bridge process when the row is written. Useful for identifying performance spikes. | +| **Memory RSS (MB)** | Float (MiB) | Resident memory usage of the bridge process at the time of measurement. | +| **Total Duration** | Float (seconds) | `Simulation Request Completed Time - Request Received Time`. The total latency perceived by the client. | +| **Average Result Interval** | Float (seconds) | Average time between successive results in Result Times (0 if fewer than 2 results). Indicates streaming frequency. | +| **Input Overhead** | Float (seconds) | `Core Sent Input Time - Request Received Time`. Time spent in the bridge before core processing begins. | +| **Output Overheads** | List | For each result i: `Result Sent Times[i] - Result Times[i]`. Serialization and transport delay per result chunk. | +| **Total Overheads** | List | `Input Overhead + Output Overheads[i]`. Complete bridge processing time contribution per result chunk. | > **Note:** > > - **Output Overheads** and **Total Overheads** are stored as lists to maintain consistency across batch, streaming, and interactive modes diff --git a/simulation_bridge/src/protocol_adapters/mqtt/mqtt_adapter.py b/simulation_bridge/src/protocol_adapters/mqtt/mqtt_adapter.py index 4351e13b..dcdda0de 100644 --- a/simulation_bridge/src/protocol_adapters/mqtt/mqtt_adapter.py +++ b/simulation_bridge/src/protocol_adapters/mqtt/mqtt_adapter.py @@ -144,7 +144,13 @@ def on_message(self, client, userdata, msg): "MQTT - Processing message %s, from producer: %s, simulator: %s", message, producer, consumer) - performance_monitor.start_operation(operation_id) + simulation_type = simulation.get('type', 'unknown') + performance_monitor.start_operation( + operation_id, + client_id=producer, + protocol='mqtt', + simulation_type=simulation_type + ) # Send signal directly signal('message_received_input_mqtt').send( diff --git a/simulation_bridge/src/protocol_adapters/rabbitmq/rabbitmq_adapter.py b/simulation_bridge/src/protocol_adapters/rabbitmq/rabbitmq_adapter.py index b462e41d..0c1b0e0d 100644 --- a/simulation_bridge/src/protocol_adapters/rabbitmq/rabbitmq_adapter.py +++ b/simulation_bridge/src/protocol_adapters/rabbitmq/rabbitmq_adapter.py @@ -144,7 +144,14 @@ def _process_message(self, ch, method, properties, body, queue_name): operation_id = message.get( 'simulation', {}).get( 'request_id', 'unknown') - performance_monitor.start_operation(operation_id) + simulation_type = message.get( + 'simulation', {}).get('type', 'unknown') + performance_monitor.start_operation( + operation_id, + client_id=producer, + protocol='rabbitmq', + simulation_type=simulation_type + ) elif queue_name == 'Q.bridge.result': operation_id = message.get('request_id', 'unknown') performance_monitor.record_core_received_result(operation_id) diff --git a/simulation_bridge/src/protocol_adapters/rest/rest_adapter.py b/simulation_bridge/src/protocol_adapters/rest/rest_adapter.py index fba0f7f6..dd5b3eaa 100644 --- a/simulation_bridge/src/protocol_adapters/rest/rest_adapter.py +++ b/simulation_bridge/src/protocol_adapters/rest/rest_adapter.py @@ -72,7 +72,13 @@ async def handle_streaming_message() -> Response: 'consumer': consumer } - performance_monitor.start_operation(operation_id) + simulation_type = simulation.get('type', 'unknown') + performance_monitor.start_operation( + operation_id, + client_id=producer, + protocol='rest', + simulation_type=simulation_type + ) signal('message_received_input_rest').send( message=message, diff --git a/simulation_bridge/src/utils/performance_monitor.py b/simulation_bridge/src/utils/performance_monitor.py index 478cf109..5a3b11e9 100644 --- a/simulation_bridge/src/utils/performance_monitor.py +++ b/simulation_bridge/src/utils/performance_monitor.py @@ -16,7 +16,10 @@ @dataclass class PerformanceMetrics: + client_id: str + client_protocol: str operation_id: str + simulation_type: str timestamp: float request_received_time: float core_received_input_time: float @@ -82,8 +85,11 @@ def _write_csv_headers(self) -> None: with self.csv_path.open("w", newline="", encoding="utf-8") as fh: writer = csv.writer(fh) writer.writerow([ - "Operation ID", "Timestamp", + "Client ID", + "Client Protocol", + "Operation ID", + "Simulation Type", "Request Received Time", "Core Received Input Time", "Core Sent Input Time", @@ -111,13 +117,18 @@ def _update_system_metrics(self, metric: PerformanceMetrics) -> None: def _is_valid_operation(self, operation_id: str) -> bool: return self.enabled and operation_id in self.metrics_by_operation_id - def start_operation(self, operation_id: str) -> None: + def start_operation(self, operation_id: str, client_id: str = "unknown", + protocol: str = "unknown", + simulation_type: str = "unknown") -> None: if not self.enabled or operation_id in self.metrics_by_operation_id: return now = time.time() self.metrics_by_operation_id[operation_id] = PerformanceMetrics( + client_id=client_id, + client_protocol=protocol, operation_id=operation_id, + simulation_type=simulation_type, timestamp=now, request_received_time=now, core_received_input_time=0.0, @@ -186,8 +197,11 @@ def _save_metrics_to_csv(self, metric: PerformanceMetrics) -> None: with self.csv_path.open("a", newline="", encoding="utf-8") as fh: writer = csv.writer(fh) writer.writerow([ - metric.operation_id, metric.timestamp, + metric.client_id, + metric.client_protocol, + metric.operation_id, + metric.simulation_type, metric.request_received_time, metric.core_received_input_time, metric.core_sent_input_time, diff --git a/simulation_bridge/test/unit/test_performance_monitor.py b/simulation_bridge/test/unit/test_performance_monitor.py index 33d205c6..50b5f9f4 100644 --- a/simulation_bridge/test/unit/test_performance_monitor.py +++ b/simulation_bridge/test/unit/test_performance_monitor.py @@ -71,17 +71,22 @@ def test_initialization_creates_dir_and_file(tmp_path, config_enabled): def test_start_operation_creates_metric(monitor_enabled): op_id = "op1" - monitor_enabled.start_operation(op_id) + monitor_enabled.start_operation( + op_id, client_id="c1", protocol="rest", simulation_type="batch") assert op_id in monitor_enabled.metrics_by_operation_id metric = monitor_enabled.metrics_by_operation_id[op_id] assert metric.operation_id == op_id assert metric.timestamp > 0 + assert metric.client_id == "c1" + assert metric.client_protocol == "rest" + assert metric.simulation_type == "batch" def test_record_timestamps_update_fields(monitor_enabled): op_id = "op2" - monitor_enabled.start_operation(op_id) + monitor_enabled.start_operation( + op_id, client_id="c1", protocol="rest", simulation_type="batch") with patch("time.time", return_value=1234.5): monitor_enabled.record_core_received_input(op_id) @@ -93,7 +98,8 @@ def test_record_timestamps_update_fields(monitor_enabled): def test_record_core_received_result_appends_time_and_updates_metrics(monitor_enabled): op_id = "op3" - monitor_enabled.start_operation(op_id) + monitor_enabled.start_operation( + op_id, client_id="c1", protocol="rest", simulation_type="batch") with ( patch("time.time", return_value=1000.0), patch.object(monitor_enabled, "_update_system_metrics") as mock_update, @@ -106,7 +112,8 @@ def test_record_core_received_result_appends_time_and_updates_metrics(monitor_en def test_finalize_operation_calculates_metrics_and_saves(monitor_enabled): op_id = "op4" - monitor_enabled.start_operation(op_id) + monitor_enabled.start_operation( + op_id, client_id="c1", protocol="rest", simulation_type="batch") metric = monitor_enabled.metrics_by_operation_id[op_id] metric.request_received_time = 1.0 @@ -131,7 +138,8 @@ def test_finalize_operation_calculates_metrics_and_saves(monitor_enabled): def test_disabled_monitor_skips_methods(monitor_disabled): op_id = "op_disabled" - monitor_disabled.start_operation(op_id) + monitor_disabled.start_operation( + op_id, client_id="c1", protocol="rest", simulation_type="batch") monitor_disabled.record_core_received_input(op_id) monitor_disabled.record_core_sent_input(op_id) monitor_disabled.record_result_sent(op_id)