Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions simulation_bridge/docs/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@ A new row is appended to the CSV each time the bridge processes a client request

## CSV Schema - Column Descriptions

| Column | Type | Description |
| ------------------------------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| **Operation ID** | String (UUID) | Unique identifier for each request. Used for cross-referencing logs and traces. |
| **Timestamp** | Float (epoch seconds) | Time when `start_operation()` is called. Serves as the reference point for all time deltas. |
| **Request Received Time** | Float (seconds) | When the bridge's event loop first registers the incoming API call. |
| **Core Received Input Time** | Float (seconds) | The moment when the simulation core acknowledged and buffered the input payload (i.e., when it consumed the data from the bridge). The difference between `Core Received Input Time` and `Request Received Time` quantifies the signal overhead. |
| **Core Sent Input Time** | Float (seconds) | The point when the bridge finished delivering the prepared input to the simulation core. |
| **Number of Results** | Integer | Count of partial results produced by the core (`len(Result Times)`). |
| **Result Times** | List | Semicolon-separated timestamps when each partial result arrives at the bridge. |
| **Result Sent Times** | List | Timestamps when results are sent from the bridge to the client. Same length as Result Times. |
| **Simulation Request Completed Time** | Float (seconds) | When the final byte of the last response is sent. Represents the true end-to-end completion. |
| **CPU Percent** | Float (%) | Instantaneous CPU usage of the bridge process when the row is written. Useful for identifying performance spikes. |
| **Memory RSS (MB)** | Float (MiB) | Resident memory usage of the bridge process at the time of measurement. |
| **Total Duration** | Float (seconds) | `Simulation Request Completed Time - Request Received Time`. The total latency perceived by the client. |
| **Average Result Interval** | Float (seconds) | Average time between successive results in Result Times (0 if fewer than 2 results). Indicates streaming frequency. |
| **Input Overhead** | Float (seconds) | `Core Sent Input Time - Request Received Time`. Time spent in the bridge before core processing begins. |
| **Output Overheads** | List | For each result i: `Result Sent Times[i] - Result Times[i]`. Serialization and transport delay per result chunk. |
| **Total Overheads** | List | `Input Overhead + Output Overheads[i]`. Complete bridge processing time contribution per result chunk. |

| Column | Type | Description |
| ------------------------------------- | --------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Timestamp** | Float (epoch seconds) | Time when `start_operation()` is called. Serves as the reference point for all time deltas. |
| **Client ID** | String | Identifier of the requesting client. |
| **Client Protocol** | String | Protocol used to communicate with the bridge (e.g., REST, MQTT). |
| **Operation ID** | String (UUID) | Unique identifier for each request. Used for cross-referencing logs and traces. |
| **Simulation Type** | String | Type of simulation requested (`batch`, `streaming`, or `interactive`). |
| **Request Received Time** | Float (seconds) | When the bridge's event loop first registers the incoming API call. |
| **Core Received Input Time** | Float (seconds) | The moment when the simulation core acknowledged and buffered the input payload (i.e., when it consumed the data from the bridge). The difference between `Core Received Input Time` and `Request Received Time` quantifies the signal overhead. |
| **Core Sent Input Time** | Float (seconds) | The point when the bridge finished delivering the prepared input to the simulation core. |
| **Number of Results** | Integer | Count of partial results produced by the core (`len(Result Times)`). |
| **Result Times** | List | Semicolon-separated timestamps when each partial result arrives at the bridge. |
| **Result Sent Times** | List | Timestamps when results are sent from the bridge to the client. Same length as Result Times. |
| **Simulation Request Completed Time** | Float (seconds) | When the final byte of the last response is sent. Represents the true end-to-end completion. |
| **CPU Percent** | Float (%) | Instantaneous CPU usage of the bridge process when the row is written. Useful for identifying performance spikes. |
| **Memory RSS (MB)** | Float (MiB) | Resident memory usage of the bridge process at the time of measurement. |
| **Total Duration** | Float (seconds) | `Simulation Request Completed Time - Request Received Time`. The total latency perceived by the client. |
| **Average Result Interval** | Float (seconds) | Average time between successive results in Result Times (0 if fewer than 2 results). Indicates streaming frequency. |
| **Input Overhead** | Float (seconds) | `Core Sent Input Time - Request Received Time`. Time spent in the bridge before core processing begins. |
| **Output Overheads** | List | For each result i: `Result Sent Times[i] - Result Times[i]`. Serialization and transport delay per result chunk. |
| **Total Overheads** | List | `Input Overhead + Output Overheads[i]`. Complete bridge processing time contribution per result chunk. |
> **Note:**
>
> - **Output Overheads** and **Total Overheads** are stored as lists to maintain consistency across batch, streaming, and interactive modes
8 changes: 7 additions & 1 deletion simulation_bridge/src/protocol_adapters/mqtt/mqtt_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,13 @@ def on_message(self, client, userdata, msg):
"MQTT - Processing message %s, from producer: %s, simulator: %s",
message, producer, consumer)

performance_monitor.start_operation(operation_id)
simulation_type = simulation.get('type', 'unknown')
performance_monitor.start_operation(
operation_id,
client_id=producer,
protocol='mqtt',
simulation_type=simulation_type
)

# Send signal directly
signal('message_received_input_mqtt').send(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,14 @@ def _process_message(self, ch, method, properties, body, queue_name):
operation_id = message.get(
'simulation', {}).get(
'request_id', 'unknown')
performance_monitor.start_operation(operation_id)
simulation_type = message.get(
'simulation', {}).get('type', 'unknown')
performance_monitor.start_operation(
operation_id,
client_id=producer,
protocol='rabbitmq',
simulation_type=simulation_type
)
elif queue_name == 'Q.bridge.result':
operation_id = message.get('request_id', 'unknown')
performance_monitor.record_core_received_result(operation_id)
Expand Down
8 changes: 7 additions & 1 deletion simulation_bridge/src/protocol_adapters/rest/rest_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ async def handle_streaming_message() -> Response:
'consumer': consumer
}

performance_monitor.start_operation(operation_id)
simulation_type = simulation.get('type', 'unknown')
performance_monitor.start_operation(
operation_id,
client_id=producer,
protocol='rest',
simulation_type=simulation_type
)

signal('message_received_input_rest').send(
message=message,
Expand Down
20 changes: 17 additions & 3 deletions simulation_bridge/src/utils/performance_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

@dataclass
class PerformanceMetrics:
client_id: str
client_protocol: str
operation_id: str
simulation_type: str
timestamp: float
request_received_time: float
core_received_input_time: float
Expand Down Expand Up @@ -82,8 +85,11 @@ def _write_csv_headers(self) -> None:
with self.csv_path.open("w", newline="", encoding="utf-8") as fh:
writer = csv.writer(fh)
writer.writerow([
"Operation ID",
"Timestamp",
"Client ID",
"Client Protocol",
"Operation ID",
"Simulation Type",
"Request Received Time",
"Core Received Input Time",
"Core Sent Input Time",
Expand Down Expand Up @@ -111,13 +117,18 @@ def _update_system_metrics(self, metric: PerformanceMetrics) -> None:
def _is_valid_operation(self, operation_id: str) -> bool:
return self.enabled and operation_id in self.metrics_by_operation_id

def start_operation(self, operation_id: str) -> None:
def start_operation(self, operation_id: str, client_id: str = "unknown",
protocol: str = "unknown",
simulation_type: str = "unknown") -> None:
if not self.enabled or operation_id in self.metrics_by_operation_id:
return

now = time.time()
self.metrics_by_operation_id[operation_id] = PerformanceMetrics(
client_id=client_id,
client_protocol=protocol,
operation_id=operation_id,
simulation_type=simulation_type,
timestamp=now,
request_received_time=now,
core_received_input_time=0.0,
Expand Down Expand Up @@ -186,8 +197,11 @@ def _save_metrics_to_csv(self, metric: PerformanceMetrics) -> None:
with self.csv_path.open("a", newline="", encoding="utf-8") as fh:
writer = csv.writer(fh)
writer.writerow([
metric.operation_id,
metric.timestamp,
metric.client_id,
metric.client_protocol,
metric.operation_id,
metric.simulation_type,
metric.request_received_time,
metric.core_received_input_time,
metric.core_sent_input_time,
Expand Down
18 changes: 13 additions & 5 deletions simulation_bridge/test/unit/test_performance_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,22 @@ def test_initialization_creates_dir_and_file(tmp_path, config_enabled):

def test_start_operation_creates_metric(monitor_enabled):
op_id = "op1"
monitor_enabled.start_operation(op_id)
monitor_enabled.start_operation(
op_id, client_id="c1", protocol="rest", simulation_type="batch")

assert op_id in monitor_enabled.metrics_by_operation_id
metric = monitor_enabled.metrics_by_operation_id[op_id]
assert metric.operation_id == op_id
assert metric.timestamp > 0
assert metric.client_id == "c1"
assert metric.client_protocol == "rest"
assert metric.simulation_type == "batch"


def test_record_timestamps_update_fields(monitor_enabled):
op_id = "op2"
monitor_enabled.start_operation(op_id)
monitor_enabled.start_operation(
op_id, client_id="c1", protocol="rest", simulation_type="batch")

with patch("time.time", return_value=1234.5):
monitor_enabled.record_core_received_input(op_id)
Expand All @@ -93,7 +98,8 @@ def test_record_timestamps_update_fields(monitor_enabled):

def test_record_core_received_result_appends_time_and_updates_metrics(monitor_enabled):
op_id = "op3"
monitor_enabled.start_operation(op_id)
monitor_enabled.start_operation(
op_id, client_id="c1", protocol="rest", simulation_type="batch")
with (
patch("time.time", return_value=1000.0),
patch.object(monitor_enabled, "_update_system_metrics") as mock_update,
Expand All @@ -106,7 +112,8 @@ def test_record_core_received_result_appends_time_and_updates_metrics(monitor_en

def test_finalize_operation_calculates_metrics_and_saves(monitor_enabled):
op_id = "op4"
monitor_enabled.start_operation(op_id)
monitor_enabled.start_operation(
op_id, client_id="c1", protocol="rest", simulation_type="batch")
metric = monitor_enabled.metrics_by_operation_id[op_id]

metric.request_received_time = 1.0
Expand All @@ -131,7 +138,8 @@ def test_finalize_operation_calculates_metrics_and_saves(monitor_enabled):

def test_disabled_monitor_skips_methods(monitor_disabled):
op_id = "op_disabled"
monitor_disabled.start_operation(op_id)
monitor_disabled.start_operation(
op_id, client_id="c1", protocol="rest", simulation_type="batch")
monitor_disabled.record_core_received_input(op_id)
monitor_disabled.record_core_sent_input(op_id)
monitor_disabled.record_result_sent(op_id)
Expand Down