|
| 1 | +# A.py (Client and Primary Measurement Node) |
| 2 | +import concore |
| 3 | +import time |
| 4 | +import os |
| 5 | +import psutil |
| 6 | +import sys |
| 7 | + |
| 8 | +# --- ZMQ Initialization --- |
| 9 | +# This REQ socket connects to Node B |
| 10 | +concore.init_zmq_port( |
| 11 | + port_name=PORT_NAME_F1_F2, |
| 12 | + port_type="connect", |
| 13 | + address="tcp://localhost:" + PORT_F1_F2, |
| 14 | + socket_type_str="REQ" |
| 15 | +) |
| 16 | + |
| 17 | +print("Node A client started.") |
| 18 | + |
| 19 | +# --- Measurement Initialization --- |
| 20 | +min_latency = float('inf') |
| 21 | +max_latency = 0.0 |
| 22 | +total_latency = 0.0 |
| 23 | +message_count = 0 |
| 24 | +total_bytes = 0 |
| 25 | +process = psutil.Process(os.getpid()) |
| 26 | +overall_start_time = time.monotonic() |
| 27 | +loop_start_time = 0 |
| 28 | + |
| 29 | +current_value = 0 |
| 30 | +max_value = 100 |
| 31 | + |
| 32 | +while current_value < max_value: |
| 33 | + loop_start_time = time.monotonic() # Start timer for round-trip latency |
| 34 | + print(f"Node A: Sending value {current_value:.2f} to Node B.") |
| 35 | + |
| 36 | + # 1. Send the current value as a request to the pipeline |
| 37 | + concore.write(PORT_NAME_F1_F2, "value", [current_value]) |
| 38 | + total_bytes += sys.getsizeof([current_value]) |
| 39 | + |
| 40 | + # 2. Wait for the final, processed value in reply |
| 41 | + received_data = concore.read(PORT_NAME_F1_F2, "value", [0.0]) |
| 42 | + |
| 43 | + loop_end_time = time.monotonic() |
| 44 | + latency_ms = (loop_end_time - loop_start_time) * 1000 |
| 45 | + |
| 46 | + # Update metrics |
| 47 | + message_count += 1 |
| 48 | + min_latency = min(min_latency, latency_ms) |
| 49 | + max_latency = max(max_latency, latency_ms) |
| 50 | + total_latency += latency_ms |
| 51 | + |
| 52 | + current_value = received_data[0] |
| 53 | + print(f"Node A: Received final value {current_value:.2f} from the pipeline. | Latency: {latency_ms:.2f} ms") |
| 54 | + print("-" * 20) |
| 55 | + |
| 56 | +# --- Finalize and Report Measurements --- |
| 57 | +overall_end_time = time.monotonic() |
| 58 | +total_duration = overall_end_time - overall_start_time |
| 59 | +cpu_usage = process.cpu_percent() / total_duration if total_duration > 0 else 0 |
| 60 | +avg_latency = total_latency / message_count if message_count > 0 else 0 |
| 61 | + |
| 62 | +print("\n" + "="*35) |
| 63 | +print("--- NODE A: END-TO-END RESULTS ---") |
| 64 | +print(f"Total pipeline iterations: {message_count}") |
| 65 | +print(f"Total data sent: {total_bytes / 1024:.4f} KB") |
| 66 | +print(f"Total End-to-End Time: {total_duration:.4f} seconds") |
| 67 | +print("-" * 35) |
| 68 | +print(f"Min round-trip latency: {min_latency:.2f} ms") |
| 69 | +print(f"Avg round-trip latency: {avg_latency:.2f} ms") |
| 70 | +print(f"Max round-trip latency: {max_latency:.2f} ms") |
| 71 | +print("-" * 35) |
| 72 | +print(f"Approximate CPU usage: {cpu_usage:.2f}%") |
| 73 | +print("="*35) |
| 74 | + |
| 75 | +print(f"\nNode A: Final value {current_value:.2f} reached the target. Terminating.") |
| 76 | +concore.terminate_zmq() |
0 commit comments