-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathruns.py
More file actions
315 lines (267 loc) · 12.6 KB
/
runs.py
File metadata and controls
315 lines (267 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
import os
import sys
import json
import gc
import platform
import time
import multiprocessing
from functools import partial
from typing import Optional
from ml.event_sources import Timer
from ml.tracer import Tracer, analyze_trace_log, merge_trace_logs
from ml.enums import OnFullBehavior, LogLevel, ExecutionMode
from ml import data
from me.serializer import DiagramSerializer
from diagrams.engine import MainWindow
from diagrams.optimization import run_simulated_annealing
from datetime import datetime
from monitor.plot_server import main as run_plot_server
from description import Top
import conf
import types
class Configuration:
"""A simple object to hold and pass simulation configuration."""
def __init__(self, *configurations):
"""Loads and merges variables from one or more configuration modules."""
for configuration in configurations:
# Load all variables from the configuration module into this object
for key, value in vars(configuration).items():
if not key.startswith('__'):
# Only copy attributes that are not modules or functions, as they are unpicklable.
# This prevents multiprocessing errors when passing the configuration.
is_module = isinstance(value, types.ModuleType)
if not is_module:
setattr(self, key, value)
# --- Log constants ---
MAIN_COMPONENT_ID = "MAIN"
LOG_EVENT_START = "START"
LOG_EVENT_FAILURE = "FAILURE"
LOG_EVENT_SUCCESS = "SUCCESS"
LOG_EVENT_INTERRUPT = "INTERRUPT"
LOG_DETAIL_KEY_MESSAGE = "message"
MSG_SIM_START = "Starting simulation threads on {}"
MSG_SIM_FAILURE = "Simulation finished with errors."
MSG_SIM_SUCCESS = "Simulation finished."
MSG_INTERRUPT = "Ctrl+C received, stopping simulation."
MSG_PLOT_SERVER_STARTING = "Starting plot server process..."
MSG_SHUTDOWN_COMPLETE = "All components terminated. Flushing final logs."
# --- Diagram utility messages ---
MSG_DIAG_INSTANTIATING = "Instantiating the simulation model..."
MSG_DIAG_PART_FOUND = "Found part to serialize: '{}'"
MSG_DIAG_SERIALIZING = "Serializing part structure to JSON..."
MSG_DIAG_EXPORT_SUCCESS = "\nSuccessfully exported the structure of '{}' to '{}'"
MSG_DIAG_EXPORT_ERROR = "\nError: {}"
MSG_DIAG_FILE_NOT_FOUND = "Error: JSON file '{}' not found. Please run with 'export' argument first."
MSG_DIAG_LOADING = "Loading diagram from '{}'..."
MSG_DIAG_IMPORT_SUCCESS = "Diagram imported successfully."
MSG_DIAG_IMPORT_ERROR = "Error importing diagram: {}"
def set_high_priority():
"""
Sets the process priority to high to improve timer accuracy for real-time simulation.
This is crucial for reducing OS-level scheduling jitter.
"""
if platform.system() == "Windows":
# On Windows, 0x80 is "High Priority".
# Note: This requires the 'psutil' library.
# You may need to run: pip install psutil
import psutil
p = psutil.Process(os.getpid())
p.nice(psutil.HIGH_PRIORITY_CLASS)
else:
# On Linux/macOS, a lower nice value means higher priority.
os.nice(-10)
def simulate(trace_filename=None, error_filename=None):
# Create a configuration object that holds all parameters.
configuration = Configuration(conf)
plot_server_process = None
top = None
timer = None
try:
if configuration.PLOT:
print(MSG_PLOT_SERVER_STARTING)
plot_server_process = multiprocessing.Process(
target=run_plot_server,
args=(configuration.POSITION_GRAPH_BOUNDARIES, configuration.SPEED_GRAPH_BOUNDARIES)
)
plot_server_process.daemon = True # Ensure the process exits with the main script
plot_server_process.start()
# Wait a moment for the server to initialize and start listening
time.sleep(1.5)
log_queue = None
error_queue = None
# Conditionally start the tracer based on configuration
if configuration.TRACER_ENABLED:
if configuration.PARALLEL_EXECUTION_MODE == ExecutionMode.PROCESS:
log_queue = multiprocessing.Queue()
error_queue = multiprocessing.Queue()
Tracer.start(
level=LogLevel.TRACE,
flush_interval_seconds=5.0,
output_file=trace_filename,
error_file=error_filename,
log_to_console=True,
log_queue=log_queue,
error_queue=error_queue
)
data.configure(configuration)
top = Top('top', conf=configuration, log_queue=log_queue, error_queue=error_queue)
# Initialize the simulation to set up pybullet and get the time step
top.init()
# Create the external timer event source
# The `on_full` policy is determined by the REAL_TIME_SIMULATION flag.
# In real-time mode (`FAIL`), the simulation stops if it cannot keep up
# with the timer, which is crucial for exposing performance bottlenecks.
# In non-real-time mode (`DROP`), it drops events to keep running,
# which can be useful for non-critical runs or debugging.
on_full_behavior = OnFullBehavior.FAIL if configuration.REAL_TIME_SIMULATION else OnFullBehavior.DROP
timer = Timer(identifier='physics_timer', interval_seconds=configuration.TIME_STEP, on_full=on_full_behavior, duration_seconds=configuration.DURATION_SECONDS)
# Connect the timer to the simulation's main event queue
event_queues = top.get_event_queues()
top.connect_event_source(timer, event_queues[0].get_identifier())
Tracer.log(LogLevel.INFO, MAIN_COMPONENT_ID, LOG_EVENT_START, {LOG_DETAIL_KEY_MESSAGE: MSG_SIM_START.format(datetime.now())})
# Disable the garbage collector to prevent unpredictable pauses during
# the simulation, which can cause jitter and missed deadlines.
gc.disable()
# For real-time simulations, increase process priority to minimize
# OS scheduler jitter, which can cause timer inaccuracies.
if configuration.HIGH_PRIORITY:
set_high_priority()
# Start the simulation thread, which will spawn worker processes.
top.start(stop_condition=lambda _: timer.stop_event_is_set())
# If using persistent processes, block until all workers have confirmed they are initialized.
# This is not needed for ExecutionMode.THREAD.
top.wait_for_ready()
# Now that all components are ready, start the timer to begin event flow.
timer.start()
# Block here and wait for the main simulation thread to finish.
top.wait()
top.term()
if top.get_exception() or timer.get_exception():
Tracer.log(LogLevel.ERROR, MAIN_COMPONENT_ID, LOG_EVENT_FAILURE, {LOG_DETAIL_KEY_MESSAGE: MSG_SIM_FAILURE})
else:
Tracer.log(LogLevel.INFO, MAIN_COMPONENT_ID, LOG_EVENT_SUCCESS, {LOG_DETAIL_KEY_MESSAGE: MSG_SIM_SUCCESS})
except KeyboardInterrupt:
# On Ctrl+C, log the interrupt and let the finally block handle shutdown.
Tracer.log(LogLevel.INFO, MAIN_COMPONENT_ID, LOG_EVENT_INTERRUPT, {LOG_DETAIL_KEY_MESSAGE: MSG_INTERRUPT})
finally:
# This block ensures cleanup happens on normal exit, Ctrl+C, or any other exception.
# It centralizes the entire shutdown sequence for maximum robustness.
# 1. Signal all running components to stop. This is non-blocking.
if timer:
timer.stop()
if top:
top.stop()
# Re-enable the garbage collector.
gc.enable()
# 2. Wait for components to finish. The order is important for a graceful shutdown.
# Wait for event sources first to prevent new work.
if timer:
timer.wait()
# 3. Wait for the main simulation part to finish, then terminate it.
# top.term() recursively shuts down all children (threads/processes).
if top:
top.wait()
top.term()
# 4. Terminate auxiliary processes.
if plot_server_process and plot_server_process.is_alive():
plot_server_process.terminate()
# 5. Stop the tracer last. This is a blocking call that flushes all final logs.
if configuration.TRACER_ENABLED:
Tracer.log(LogLevel.INFO, MAIN_COMPONENT_ID, LOG_EVENT_SUCCESS, {LOG_DETAIL_KEY_MESSAGE: MSG_SHUTDOWN_COMPLETE})
Tracer.stop()
if log_queue:
log_queue.cancel_join_thread()
if error_queue:
error_queue.cancel_join_thread()
def export_diagram(structure_filename=None, part_id=None):
"""
Instantiates the Multirotor model, serializes its structure to JSON,
and saves it to a file.
Args:
structure_filename (str): The path to save the JSON file.
part_id (str): The dot-separated ID of the part to export (e.g., 'multirotor.controller').
"""
print(MSG_DIAG_INSTANTIATING)
configuration = Configuration(conf)
data.configure(configuration)
top = Top('top', conf=configuration)
# Traverse the hierarchy to find the requested part
part = top
if part_id:
for child_id in part_id.split('.'):
part = part.get_part(child_id)
print(MSG_DIAG_PART_FOUND.format(part.get_full_identifier()))
serializer = DiagramSerializer()
try:
print(MSG_DIAG_SERIALIZING)
json_output = serializer.export_part_to_json(part)
with open(structure_filename, 'w') as f:
f.write(json_output)
print(MSG_DIAG_EXPORT_SUCCESS.format(part.get_full_identifier(), structure_filename))
except TypeError as e:
print(MSG_DIAG_EXPORT_ERROR.format(e))
def import_diagram(structure_filename=None):
"""
Loads a diagram from the JSON file and displays it in the editor.
"""
if not os.path.exists(structure_filename):
print(MSG_DIAG_FILE_NOT_FOUND.format(structure_filename))
return
print(MSG_DIAG_LOADING.format(structure_filename))
from PyQt5.QtWidgets import QApplication
with open(structure_filename, 'r') as f:
json_data = f.read()
# A QApplication instance is always required for a Qt app.
qapp = QApplication(sys.argv)
# --- Configure Optimizer ---
# Use Simulated Annealing as the optimization strategy.
sa_params = {
'iterations': 2000,
'initial_temp': 20.0,
'cooling_rate': 0.995,
'move_step_grid_units': 15,
'intersection_weight': 100.0,
'wirelength_weight': 0.1
}
configured_optimizer = partial(run_simulated_annealing, params=sa_params)
# Create the main window with the optimizer and the serializer
main_window = MainWindow(enable_logging=True, optimizer_func=configured_optimizer)
serializer = DiagramSerializer()
# Import the data and build the diagram
try:
serializer.import_part_from_json(json_data, main_window)
print(MSG_DIAG_IMPORT_SUCCESS)
except (ValueError, json.JSONDecodeError) as e:
print(MSG_DIAG_IMPORT_ERROR.format(e))
return
# Start the application event loop to show the window.
sys.exit(main_window.start())
def analyze_trace(trace_file: str, output_format: str = 'text', output_file: Optional[str] = None):
"""
Parses and analyzes the trace log file generated by the simulation.
Args:
trace_file: Path to the simulation trace log file.
output_format: The desired output format ('text', 'json', 'json:perfetto').
output_file: Optional path to write the output to.
"""
configuration = Configuration(conf)
if not os.path.exists(trace_file):
print(f"Error: Trace log file not found at '{trace_file}'")
return
analysis_title = 'multi-process simulation' if configuration.PARALLEL_EXECUTION_MODE == ExecutionMode.PROCESS else 'multi-thread simulation' if configuration.PARALLEL_EXECUTION_MODE == ExecutionMode.THREAD else 'simulation'
analyze_trace_log(trace_file, output_format=output_format, output_file=output_file, title=analysis_title)
def merge_traces(trace_file_1: str, trace_file_2: str, output_file: str):
"""
Merges two Perfetto JSON trace files into a single file.
Args:
trace_file_1: Path to the first trace file.
trace_file_2: Path to the second trace file.
output_file: Path to the output merged trace file.
"""
if not os.path.exists(trace_file_1):
print(f"Error: File not found: {trace_file_1}")
return
if not os.path.exists(trace_file_2):
print(f"Error: File not found: {trace_file_2}")
return
merge_trace_logs(trace_file_1, trace_file_2, output_file)