diff --git a/ChangeLog.md b/ChangeLog.md deleted file mode 100644 index f0cf6fa..0000000 --- a/ChangeLog.md +++ /dev/null @@ -1,12 +0,0 @@ -# FTIO ChangeLog - -## Version 0.0.2 -- Set the default plot unit to Bytes or Bytes/s rather than MB or MB/s -- Adjusted the plot script to automatically detect the best unit for the y-axis and scale the values accordingly - - -## Version 0.0.1 - -- Speed-up with Msgpack -- Added autocorrelation to FTIO -- Added 4 new outlier detection methods \ No newline at end of file diff --git a/Makefile b/Makefile index 64576bc..717c592 100644 --- a/Makefile +++ b/Makefile @@ -116,7 +116,7 @@ test: check_style: check_tools black . - isort . + ruff check --fix # flake8 . check_tools: diff --git a/docs/approach.md b/docs/approach.md index 15cbafd..0dd6efc 100644 --- a/docs/approach.md +++ b/docs/approach.md @@ -34,7 +34,7 @@ An overview of the core of FTIO is provided below:

- ## Online Prediction + ## Online Prediction The tool [`predictor`](https://github.com/tuda-parallel/FTIO/tree/main/ftio/cli/predictor.py) launches `ftio` in a loop. It monitors a file for changes. The file contains bandwidth values over time. Once the file changes, `ftio` is called and a new prediction is found. `predictor` performs a few additional steps compared `ftio`: * FTIO results are merged into frequency ranges using DB-Scan​ @@ -51,6 +51,28 @@ An overview of predictor.py is provided in the image below:
+### Usage Examples + +```bash +# Basic usage: X = number of MPI ranks +predictor X.jsonl -e no -f 100 + +# With window adaptation based on frequency hits +predictor X.jsonl -e no -f 100 -w frequency_hits + +# With change point detection (ADWIN algorithm, default) +predictor X.jsonl -e no -f 100 -w frequency_hits --online_adaptation adwin + +# With CUSUM or Page-Hinkley change point detection +predictor X.jsonl -e no -f 100 -w frequency_hits --online_adaptation cusum +predictor X.jsonl -e no -f 100 -w frequency_hits --online_adaptation ph + +# With GUI dashboard visualization (works with any algorithm) +ftio-gui # Start dashboard first in separate terminal +predictor X.jsonl -e no -f 100 -w frequency_hits --online_adaptation adwin --gui +``` + +For more details on change point detection algorithms, see [Change Point Detection](change_point_detection.md).

diff --git a/docs/change_point_detection.md b/docs/change_point_detection.md new file mode 100644 index 0000000..f02f8c6 --- /dev/null +++ b/docs/change_point_detection.md @@ -0,0 +1,189 @@ +# Change Point Detection for Online I/O Pattern Analysis + +This document describes the adaptive change point detection feature for FTIO's online predictor, which enables automatic detection of I/O pattern changes during streaming analysis. + +## Overview + +Change point detection allows FTIO to automatically detect when the I/O pattern changes during online prediction. When a change is detected, the analysis window is adapted to focus on the new pattern, improving prediction accuracy. + +Three algorithms are available: +- **ADWIN** (Adaptive Windowing) - Default +- **CUSUM** (Cumulative Sum) +- **Page-Hinkley** (Sequential change detection) + +## Usage + +### Command Line + +There are two configuration modes: + +**Pure change point detection**: +```bash +# Only change point detection, no hit-based optimization +predictor X.jsonl -e no -f 100 --online_adaptation adwin +predictor X.jsonl -e no -f 100 --online_adaptation cusum +predictor X.jsonl -e no -f 100 --online_adaptation ph +``` + +**Hybrid mode**: +```bash +# Change point detection + hit-based optimization +predictor X.jsonl -e no -f 100 -w frequency_hits --online_adaptation adwin +predictor X.jsonl -e no -f 100 -w frequency_hits --online_adaptation cusum +predictor X.jsonl -e no -f 100 -w frequency_hits --online_adaptation ph +``` + +### Configuration Modes Explained + +| Mode | Flags | +|------|-------| +| Pure | `--online_adaptation ` | +| Hybrid | `-w frequency_hits --online_adaptation ` | + +In **hybrid mode**, the two mechanisms are complementary: +- **Change point detection** handles pattern transitions (primary mechanism) +- **Hit-based** optimizes stable periods by shrinking the window (secondary optimization) + +Hit-based only activates when change point detection reports no change. They do not interfere with each other. + +## Algorithms + +### ADWIN (Adaptive Windowing) + +ADWIN uses Hoeffding bounds to detect statistically significant changes in the frequency distribution. + +**How it works:** +1. Maintains a sliding window of frequency observations +2. Tests all possible cut points in the window +3. Uses Hoeffding inequality to determine if the means differ significantly +4. When change detected, discards old data and adapts window + +**Parameters:** +- `delta` (default: 0.05): Confidence parameter. Lower values = higher confidence required for detection + + +### AV-CUSUM (Adaptive-Variance Cumulative Sum) + +CUSUM tracks cumulative deviations from a reference value, with adaptive thresholds based on data variance. + +**How it works:** +1. Establishes reference frequency from initial observations +2. Calculates positive and negative cumulative sums: `S+ = max(0, S+ + x - μ - k)` and `S- = max(0, S- - x + μ - k)` +3. Detects change when cumulative sum exceeds adaptive threshold (2σ) +4. Drift parameter (k = 0.5σ) prevents small fluctuations from accumulating + +**Parameters:** +- `window_size` (default: 50): Size of rolling window for variance calculation + + +### STPH (Self-Tuning Page-Hinkley) + +Page-Hinkley uses a running mean as reference and detects when observations deviate significantly. + +**How it works:** +1. Maintains running mean as reference +2. Tracks cumulative positive and negative differences from reference +3. Uses adaptive threshold and delta based on rolling standard deviation +4. Detects change when cumulative difference exceeds threshold + +**Parameters:** +- `window_size` (default: 10): Size of rolling window for variance calculation + + +## Window Adaptation + +When a change point is detected: + +1. **Exact timestamp** of the change is recorded +2. **Analysis window** is adapted to start from the change point +3. **Safety bounds** ensure minimum window size (0.5-1.0 seconds) +4. **Maximum lookback** limits window to prevent using stale data (10 seconds) + +``` +Before change detection: +|-------- old pattern --------|-- new pattern --| + ^ change point + +After adaptation: + |-- new pattern --| + ^ analysis starts here +``` + +## GUI Dashboard + +A real-time visualization dashboard is available for monitoring predictions and change points. + +### Starting the Dashboard + +```bash +# Install GUI dependencies (if not already installed) +pip install -e .[gui] + +# 1. Start the GUI dashboard first +ftio-gui + +# 2. Then run predictor with --gui flag to forward data to the dashboard +# Pure mode: +predictor X.jsonl -e no -f 100 --online_adaptation adwin --gui +# Or hybrid mode: +predictor X.jsonl -e no -f 100 -w frequency_hits --online_adaptation adwin --gui +``` + +The dashboard runs on `http://localhost:8050` and displays: +- Frequency timeline with change point markers +- Continuous cosine wave visualization +- Statistics (total predictions, changes detected, current frequency) + +### Dashboard Features + +- **Auto-updating**: Refreshes automatically as new predictions arrive +- **Change point markers**: Red vertical lines indicate detected changes +- **Frequency annotations**: Shows old → new frequency at each change +- **Gap visualization**: Displays periods with no detected frequency +- **Auto-connect**: The predictor automatically connects to the GUI when `--gui` flag is used + + + +### Algorithm Selection + +Algorithm is selected via the `--online_adaptation` flag: + +| Flag Value | Algorithm | Description | +|------------|-----------|-------------| +| `adwin` | ADWIN | Statistical guarantees with Hoeffding bounds | +| `cusum` | AV-CUSUM | Rapid detection with adaptive variance | +| `ph` | Page-Hinkley | Sequential detection with running mean | + +## Call Tree + +### Change Point Detection Call Tree +``` +ftio/cli/predictor.py::main() +└── ftio/prediction/processes.py::predictor_with_processes() + └── ftio/prediction/online_analysis.py::ftio_process() + └── online_analysis.py::window_adaptation() + ├── online_analysis.py::get_change_detector() + │ ├── ftio/prediction/change_point_detection.py::ChangePointDetector() # ADWIN + │ ├── ftio/prediction/change_point_detection.py::CUSUMDetector() # CUSUM + │ └── ftio/prediction/change_point_detection.py::SelfTuningPageHinkleyDetector() # PH + └── change_point_detection.py::detect_pattern_change_adwin() # or _cusum() or _pagehinkley() + └── ChangePointDetector::add_prediction() + └── ChangePointDetector::_detect_change() +``` + +### GUI Integration Call Tree +``` +ftio/cli/predictor.py::main() +├── ftio/prediction/online_analysis.py::init_socket_logger() +│ └── online_analysis.py::SocketLogger() +└── ftio/prediction/processes.py::predictor_with_processes() + └── ftio/prediction/online_analysis.py::ftio_process() + └── online_analysis.py::log_to_gui_and_console() + └── online_analysis.py::get_socket_logger() + └── SocketLogger::send_log() # Sends to ftio-gui dashboard + +ftio/gui/dashboard.py::main() # ftio-gui command +└── FTIODashApp::run() + ├── ftio/gui/socket_listener.py::SocketListener() # Receives from predictor + └── FTIODashApp::_create_cosine_timeline_plot() # Renders merged view +``` diff --git a/docs/contributing.md b/docs/contributing.md index d6ed601..615aa9d 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -386,5 +386,7 @@ We sincerely thank the following contributors for their valuable contributions: - [Jean-Baptiste Bensard](https://github.com/besnardjb): Metric proxy integration - [Anton Holderied](https://github.com/AntonBeasis): bachelor thesis –> new periodicity score - [Tim Dieringer](https://github.com/Tim-Dieringer): bachelor thesis -> Additional integration for Metric Proxy +- [Amine Aherbil](https://github.com/amineaherbil): bachelor thesis -> adaptive change point detection - [Julian Opper](https://github.com/JulianOpper), [Luca Schultze](https://github.com/lucasch03): PPT Lab -> Improved CI/CD pipeline + diff --git a/ftio/api/gekkoFs/predictor_gekko.py b/ftio/api/gekkoFs/predictor_gekko.py index 425cdbf..e6b8c87 100644 --- a/ftio/api/gekkoFs/predictor_gekko.py +++ b/ftio/api/gekkoFs/predictor_gekko.py @@ -111,7 +111,10 @@ def prediction_process( # display results text = display_result(freq, prediction, shared_resources=shared_resources) # data analysis to decrease window - text += window_adaptation(parsed_args, prediction, freq, shared_resources) + adaptation_text, _, _ = window_adaptation( + parsed_args, prediction, freq, shared_resources + ) + text += adaptation_text console.print(text) while not shared_resources.queue.empty(): shared_resources.data.append(shared_resources.queue.get()) diff --git a/ftio/api/gekkoFs/predictor_gekko_zmq.py b/ftio/api/gekkoFs/predictor_gekko_zmq.py index fb22787..5880acb 100644 --- a/ftio/api/gekkoFs/predictor_gekko_zmq.py +++ b/ftio/api/gekkoFs/predictor_gekko_zmq.py @@ -148,7 +148,10 @@ def prediction_zmq_process( # display results text = display_result(freq, prediction, shared_resources) # data analysis to decrease window thus change start_time - text += window_adaptation(parsed_args, prediction, freq, shared_resources) + adaptation_text, _, _ = window_adaptation( + parsed_args, prediction, freq, shared_resources + ) + text += adaptation_text # print text console.print(text) diff --git a/ftio/cli/predictor.py b/ftio/cli/predictor.py index ee45e4e..97eac0d 100644 --- a/ftio/cli/predictor.py +++ b/ftio/cli/predictor.py @@ -5,6 +5,7 @@ import sys from ftio.parse.helper import print_info +from ftio.prediction.online_analysis import init_socket_logger from ftio.prediction.pools import predictor_with_pools from ftio.prediction.processes import predictor_with_processes from ftio.prediction.processes_zmq import predictor_with_processes_zmq @@ -22,6 +23,12 @@ def main(args: list[str] = sys.argv) -> None: shared_resources = SharedResources() mode = "procs" # "procs" or "pool" + # Initialize GUI socket logger if --gui flag is present + gui_enabled = "--gui" in args + init_socket_logger(gui_enabled) + if gui_enabled: + print("[INFO] GUI mode enabled - forwarding predictions to ftio-gui dashboard") + if "pool" in mode.lower(): # prediction with a Pool of process and a callback mechanism predictor_with_pools(shared_resources, args) diff --git a/ftio/freq/discretize.py b/ftio/freq/discretize.py index 4c541e3..0ef120a 100644 --- a/ftio/freq/discretize.py +++ b/ftio/freq/discretize.py @@ -64,7 +64,6 @@ def sample_data( text += f"Recommended sampling frequency: {freq:.3e} Hz\n" # Apply limit if freq is negative N = int(np.floor((t[-1] - t[0]) * freq)) - # N = N + 1 if N != 0 else 0 # include end point limit_N = int(memory_limit // np.dtype(np.float64).itemsize) text += f"memory limit: {memory_limit/ 1000**3:.3e} GB ({limit_N} samples)\n" if limit_N < N: @@ -73,9 +72,8 @@ def sample_data( text += f"[yellow]Adjusted sampling frequency due to memory limit: {freq:.3e} Hz[/])\n" else: text += f"Sampling frequency: {freq:.3e} Hz\n" - # Compute the number of samples + # Compute number of samples N = int(np.floor((t[-1] - t[0]) * freq)) - # N = N + 1 if N != 0 else 0 # include end point text += f"Expected samples: {N}\n" # print(" '-> \033[1;Start time: %f s \033[1;0m"%t[0]) diff --git a/ftio/gui/__init__.py b/ftio/gui/__init__.py new file mode 100644 index 0000000..1383f55 --- /dev/null +++ b/ftio/gui/__init__.py @@ -0,0 +1,41 @@ +""" +FTIO GUI Dashboard for real-time prediction visualization. + +This module provides a Dash-based web dashboard for visualizing FTIO predictions +and change point detection results in real-time. + +Author: Amine Aherbil +Copyright (c) 2025 TU Darmstadt, Germany +Date: January 2025 + +Licensed under the BSD 3-Clause License. +For more information, see the LICENSE file in the project root: +https://github.com/tuda-parallel/FTIO/blob/main/LICENSE + +Requires: pip install ftio-hpc[gui] +""" + +__all__ = [ + "FTIODashApp", + "PredictionData", + "ChangePoint", + "PredictionDataStore", + "SocketListener", +] + + +def __getattr__(name): + """Lazy import to avoid requiring dash unless actually used.""" + if name == "FTIODashApp": + from ftio.gui.dashboard import FTIODashApp + + return FTIODashApp + elif name == "SocketListener": + from ftio.gui.socket_listener import SocketListener + + return SocketListener + elif name in ("PredictionData", "ChangePoint", "PredictionDataStore"): + from ftio.gui import data_models + + return getattr(data_models, name) + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/ftio/gui/dashboard.py b/ftio/gui/dashboard.py new file mode 100644 index 0000000..d489741 --- /dev/null +++ b/ftio/gui/dashboard.py @@ -0,0 +1,690 @@ +""" +Main Dash application for FTIO prediction visualization. + +This module provides a real-time dashboard for visualizing FTIO predictions +and change point detection results. It includes auto-updating visualizations, +statistics display, and prediction selection controls. + +Author: Amine Aherbil +Copyright (c) 2025 TU Darmstadt, Germany +Date: January 2025 + +Licensed under the BSD 3-Clause License. +For more information, see the LICENSE file in the project root: +https://github.com/tuda-parallel/FTIO/blob/main/LICENSE +""" + +import argparse +import time + +import dash +import numpy as np +from dash import Input, Output, callback_context, dcc, html + +from ftio.gui.data_models import PredictionDataStore +from ftio.gui.socket_listener import SocketListener +from ftio.gui.visualizations import CosineWaveViz + + +class FTIODashApp: + """Main Dash application for FTIO prediction visualization""" + + def __init__(self, host="localhost", port=8050, socket_port=9999): + self.app = dash.Dash(__name__) + self.host = host + self.port = port + self.socket_port = socket_port + + self.data_store = PredictionDataStore() + self.selected_prediction_id = None + self.last_update = time.time() + + self.socket_listener = SocketListener( + port=socket_port, data_callback=self._on_data_received + ) + + self._setup_layout() + self._setup_callbacks() + + self.socket_thread = self.socket_listener.start_in_thread() + + print(f"FTIO Dashboard starting on http://{host}:{port}") + print(f"Socket listener on port {socket_port}") + + def _setup_layout(self): + """Setup the Dash app layout""" + + self.app.layout = html.Div( + [ + html.Div( + [ + html.H1( + "FTIO Prediction Visualizer", + style={ + "textAlign": "center", + "color": "#2c3e50", + "marginBottom": "20px", + }, + ), + html.Div( + [ + html.P( + f"Socket listening on port {self.socket_port}", + style={ + "textAlign": "center", + "color": "#7f8c8d", + "margin": "0", + }, + ), + html.P( + id="connection-status", + children="Waiting for predictions...", + style={ + "textAlign": "center", + "color": "#e74c3c", + "margin": "0", + }, + ), + ] + ), + ], + style={"marginBottom": "30px"}, + ), + html.Div( + [ + html.Div( + [ + html.Label("View Mode:"), + dcc.Dropdown( + id="view-mode", + options=[ + { + "label": "Dashboard (Merged Cosine Wave)", + "value": "dashboard", + }, + { + "label": "Individual Prediction (Single Wave)", + "value": "cosine", + }, + ], + value="dashboard", + style={"width": "250px"}, + ), + ], + style={"display": "inline-block", "marginRight": "20px"}, + ), + html.Div( + [ + html.Label("Select Prediction:"), + dcc.Dropdown( + id="prediction-selector", + options=[], + value=None, + placeholder="Select prediction for cosine view", + style={"width": "250px"}, + ), + ], + style={"display": "inline-block", "marginRight": "20px"}, + ), + html.Div( + [ + html.Button( + "Clear Data", + id="clear-button", + n_clicks=0, + style={ + "backgroundColor": "#e74c3c", + "color": "white", + "border": "none", + "padding": "8px 16px", + "cursor": "pointer", + }, + ), + ], + style={"display": "inline-block"}, + ), + ], + style={ + "textAlign": "center", + "marginBottom": "20px", + "padding": "20px", + "backgroundColor": "#ecf0f1", + "borderRadius": "5px", + }, + ), + html.Div(id="stats-bar", style={"marginBottom": "20px"}), + html.Div(id="main-viz", style={"height": "600px"}), + html.Div( + [ + html.Hr(), + html.H3( + "All Predictions", + style={"color": "#2c3e50", "marginTop": "30px"}, + ), + html.Div( + id="recent-predictions-table", + style={ + "maxHeight": "400px", + "overflowY": "auto", + "border": "1px solid #ddd", + "borderRadius": "8px", + "padding": "10px", + "backgroundColor": "#f9f9f9", + }, + ), + ], + style={"marginTop": "20px"}, + ), + dcc.Interval( + id="interval-component", + interval=2000, # Update every 2 seconds + n_intervals=0, + ), + dcc.Store(id="data-store-trigger"), + ] + ) + + def _setup_callbacks(self): + """Setup Dash callbacks""" + + @self.app.callback( + [ + Output("main-viz", "children"), + Output("prediction-selector", "options"), + Output("prediction-selector", "value"), + Output("connection-status", "children"), + Output("connection-status", "style"), + Output("stats-bar", "children"), + ], + [ + Input("interval-component", "n_intervals"), + Input("view-mode", "value"), + Input("prediction-selector", "value"), + Input("clear-button", "n_clicks"), + ], + ) + def update_visualization(n_intervals, view_mode, selected_pred_id, clear_clicks): + + ctx = callback_context + if ctx.triggered and ctx.triggered[0]["prop_id"] == "clear-button.n_clicks": + if clear_clicks > 0: + self.data_store.clear_data() + self.selected_prediction_id = None + + pred_options = [] + pred_value = selected_pred_id + + if self.data_store.predictions: + pred_options = [ + { + "label": f"Prediction #{p.prediction_id} ({p.dominant_freq:.2f} Hz)", + "value": p.prediction_id, + } + for p in self.data_store.predictions[-50:] # Last 50 predictions + ] + + if pred_value is None and self.data_store.predictions: + pred_value = self.data_store.predictions[-1].prediction_id + + if self.data_store.predictions: + status_text = ( + f"Connected - {len(self.data_store.predictions)} predictions received" + ) + status_style = {"textAlign": "center", "color": "#27ae60", "margin": "0"} + else: + status_text = "Waiting for predictions..." + status_style = {"textAlign": "center", "color": "#e74c3c", "margin": "0"} + + stats_bar = self._create_stats_bar() + + if view_mode == "cosine" and pred_value is not None: + fig = CosineWaveViz.create_cosine_plot(self.data_store, pred_value) + viz_component = dcc.Graph(figure=fig, style={"height": "600px"}) + + elif view_mode == "dashboard": + + fig = self._create_cosine_timeline_plot(self.data_store) + viz_component = dcc.Graph(figure=fig, style={"height": "600px"}) + + else: + viz_component = html.Div( + [ + html.H3( + "Select a view mode and prediction to visualize", + style={ + "textAlign": "center", + "color": "#7f8c8d", + "marginTop": "200px", + }, + ) + ] + ) + + return ( + viz_component, + pred_options, + pred_value, + status_text, + status_style, + stats_bar, + ) + + @self.app.callback( + Output("recent-predictions-table", "children"), + [Input("interval-component", "n_intervals")], + ) + def update_recent_predictions_table(n_intervals): + """Update the recent predictions table""" + + if not self.data_store.predictions: + return html.P( + "No predictions yet", + style={"textAlign": "center", "color": "#7f8c8d"}, + ) + + recent_preds = self.data_store.predictions + + seen_ids = set() + unique_preds = [] + for pred in reversed(recent_preds): # Newest first + if pred.prediction_id not in seen_ids: + seen_ids.add(pred.prediction_id) + unique_preds.append(pred) + + rows = [] + for i, pred in enumerate(unique_preds): + + row_style = { + "backgroundColor": "#ffffff" if i % 2 == 0 else "#f8f9fa", + "padding": "8px", + "borderBottom": "1px solid #dee2e6", + } + + if pred.dominant_freq == 0 or pred.dominant_freq is None: + + row = html.Tr( + [ + html.Td( + f"#{pred.prediction_id}", + style={"fontWeight": "bold", "color": "#999"}, + ), + html.Td( + "—", + style={ + "color": "#999", + "textAlign": "center", + "fontStyle": "italic", + }, + ), + html.Td( + "No pattern detected", + style={"color": "#999", "fontStyle": "italic"}, + ), + ], + style=row_style, + ) + else: + + change_point_text = "" + if pred.is_change_point and pred.change_point: + cp = pred.change_point + change_point_text = ( + f"🔴 {cp.old_frequency:.2f} → {cp.new_frequency:.2f} Hz" + ) + + row = html.Tr( + [ + html.Td( + f"#{pred.prediction_id}", + style={"fontWeight": "bold", "color": "#495057"}, + ), + html.Td( + f"{pred.dominant_freq:.2f} Hz", style={"color": "#007bff"} + ), + html.Td( + change_point_text, + style={ + "color": "red" if pred.is_change_point else "black" + }, + ), + ], + style=row_style, + ) + + rows.append(row) + + table = html.Table( + [ + html.Thead( + [ + html.Tr( + [ + html.Th( + "ID", + style={ + "backgroundColor": "#6c757d", + "color": "white", + "padding": "12px", + }, + ), + html.Th( + "Frequency", + style={ + "backgroundColor": "#6c757d", + "color": "white", + "padding": "12px", + }, + ), + html.Th( + "Change Point", + style={ + "backgroundColor": "#6c757d", + "color": "white", + "padding": "12px", + }, + ), + ] + ) + ] + ), + html.Tbody(rows), + ], + style={ + "width": "100%", + "borderCollapse": "collapse", + "marginTop": "10px", + "boxShadow": "0 2px 4px rgba(0,0,0,0.1)", + "borderRadius": "8px", + "overflow": "hidden", + }, + ) + + return table + + def _create_stats_bar(self): + """Create statistics bar component""" + + if not self.data_store.predictions: + return html.Div() + + total_preds = len(self.data_store.predictions) + total_changes = len(self.data_store.change_points) + latest_pred = self.data_store.predictions[-1] + + stats_items = [ + html.Div( + [ + html.H4(str(total_preds), style={"margin": "0", "color": "#2c3e50"}), + html.P( + "Total Predictions", + style={"margin": "0", "fontSize": "12px", "color": "#7f8c8d"}, + ), + ], + style={"textAlign": "center", "flex": "1"}, + ), + html.Div( + [ + html.H4( + str(total_changes), style={"margin": "0", "color": "#e74c3c"} + ), + html.P( + "Change Points", + style={"margin": "0", "fontSize": "12px", "color": "#7f8c8d"}, + ), + ], + style={"textAlign": "center", "flex": "1"}, + ), + html.Div( + [ + html.H4( + f"{latest_pred.dominant_freq:.2f} Hz", + style={"margin": "0", "color": "#27ae60"}, + ), + html.P( + "Latest Frequency", + style={"margin": "0", "fontSize": "12px", "color": "#7f8c8d"}, + ), + ], + style={"textAlign": "center", "flex": "1"}, + ), + html.Div( + [ + html.H4( + f"{latest_pred.confidence:.1f}%", + style={"margin": "0", "color": "#3498db"}, + ), + html.P( + "Latest Confidence", + style={"margin": "0", "fontSize": "12px", "color": "#7f8c8d"}, + ), + ], + style={"textAlign": "center", "flex": "1"}, + ), + ] + + return html.Div( + stats_items, + style={ + "display": "flex", + "justifyContent": "space-around", + "backgroundColor": "#f8f9fa", + "padding": "15px", + "borderRadius": "5px", + "border": "1px solid #dee2e6", + }, + ) + + def _on_data_received(self, data): + """Callback when new data is received from socket""" + print(f"[DEBUG] Dashboard received data: {data}") + + if data["type"] == "prediction": + prediction_data = data["data"] + self.data_store.add_prediction(prediction_data) + + print( + f"[DEBUG] Added prediction #{prediction_data.prediction_id}: " + f"{prediction_data.dominant_freq:.2f} Hz " + f"({'CHANGE POINT' if prediction_data.is_change_point else 'normal'})" + ) + + self.last_update = time.time() + else: + print(f"[DEBUG] Received non-prediction data: type={data.get('type')}") + + def _create_cosine_timeline_plot(self, data_store): + """Create single continuous cosine wave showing I/O pattern evolution""" + import plotly.graph_objs as go + + if not data_store.predictions: + fig = go.Figure() + fig.add_annotation( + x=0.5, + y=0.5, + text="Waiting for predictions...", + showarrow=False, + font={"size": 16, "color": "gray"}, + ) + fig.update_layout( + xaxis={"visible": False}, + yaxis={"visible": False}, + title="I/O Pattern Timeline (Continuous Cosine Wave)", + ) + return fig + + last_3_predictions = data_store.get_latest_predictions(3) + print(f"[DEBUG] Merged view using {len(last_3_predictions)} predictions") + + sorted_predictions = sorted(last_3_predictions, key=lambda p: p.time_window[0]) + + global_time = [] + global_cosine = [] + cumulative_time = 0.0 + segment_info = [] # For change point markers + + # Normalized display: each prediction gets equal width showing 5 cycles + display_cycles = 5 # Show 5 complete cycles per prediction + + for pred in sorted_predictions: + freq = pred.dominant_freq + + if freq == 0 or freq is None: + # No frequency - show flat gap + display_duration = 1.0 # Fixed width for gaps + num_points = 100 + t_local = np.linspace(0, display_duration, num_points) + t_global = cumulative_time + t_local + + global_time.extend(t_global.tolist()) + global_cosine.extend([None] * num_points) + else: + # Normalized: show 5 cycles regardless of actual duration + display_duration = display_cycles / freq # Time for 5 cycles + num_points = 1000 + + t_local = np.linspace(0, display_duration, num_points) + cosine_segment = np.cos(2 * np.pi * freq * t_local) + + t_global = cumulative_time + t_local + + global_time.extend(t_global.tolist()) + global_cosine.extend(cosine_segment.tolist()) + + segment_start = cumulative_time + segment_end = cumulative_time + display_duration + segment_info.append((segment_start, segment_end, pred)) + + cumulative_time += display_duration + + fig = go.Figure() + + fig.add_trace( + go.Scatter( + x=global_time, + y=global_cosine, + mode="lines", + name="I/O Pattern Evolution", + line={"color": "#1f77b4", "width": 2}, + connectgaps=False, # DON'T connect across None values - creates visible gaps + hovertemplate="I/O Pattern
" + + "Time: %{x:.3f} s
" + + "Amplitude: %{y:.3f}", + ) + ) + + for seg_start, seg_end, pred in segment_info: + if pred.dominant_freq == 0 or pred.dominant_freq is None: + fig.add_vrect( + x0=seg_start, + x1=seg_end, + fillcolor="gray", + opacity=0.15, + layer="below", + line_width=0, + annotation_text="No pattern", + annotation_position="top", + ) + + for seg_start, seg_end, pred in segment_info: + if pred.is_change_point and pred.change_point: + marker_time = seg_start # Mark at the START of the changed segment + + fig.add_vline( + x=marker_time, + line_dash="solid", + line_color="red", + line_width=4, + opacity=0.8, + ) + + fig.add_annotation( + x=marker_time, + y=1.1, + text=f"🔴 CHANGE
{pred.change_point.old_frequency:.2f}→{pred.change_point.new_frequency:.2f} Hz", + showarrow=True, + arrowhead=2, + arrowsize=1, + arrowwidth=2, + arrowcolor="red", + ax=0, + ay=-40, + font={"size": 12, "color": "red", "family": "Arial Black"}, + bgcolor="rgba(255,255,255,0.9)", + bordercolor="red", + borderwidth=2, + ) + + fig.update_layout( + title="I/O Pattern Timeline (Continuous Evolution)", + xaxis_title="Time (s) - Concatenated Segments", + yaxis_title="I/O Pattern Amplitude", + showlegend=True, + height=600, + hovermode="x unified", + yaxis={"range": [-1.2, 1.2]}, + uirevision="constant", # Prevents full page refresh - keeps zoom/pan state + ) + + return fig + + def run(self, debug=False): + """Run the Dash application""" + try: + self.app.run(host=self.host, port=self.port, debug=debug) + except KeyboardInterrupt: + print("\nShutting down FTIO Dashboard...") + self.socket_listener.stop_server() + except Exception as e: + print(f"Error running dashboard: {e}") + self.socket_listener.stop_server() + + +def main(): + """Entry point for ftio-gui command""" + parser = argparse.ArgumentParser(description="FTIO Prediction GUI Dashboard") + parser.add_argument( + "--host", default="localhost", help="Dashboard host (default: localhost)" + ) + parser.add_argument( + "--port", type=int, default=8050, help="Dashboard port (default: 8050)" + ) + parser.add_argument( + "--socket-port", + type=int, + default=9999, + help="Socket listener port (default: 9999)", + ) + parser.add_argument("--debug", action="store_true", help="Run in debug mode") + + args = parser.parse_args() + + print("=" * 60) + print("FTIO Prediction GUI Dashboard") + print("=" * 60) + print(f"Dashboard URL: http://{args.host}:{args.port}") + print(f"Socket listener: {args.socket_port}") + print("") + print("Instructions:") + print("1. Start this dashboard") + print("2. Run your FTIO predictor with socket logging enabled") + print("3. Watch real-time predictions and change points in the browser") + print("") + print("Press Ctrl+C to stop") + print("=" * 60) + + try: + dashboard = FTIODashApp( + host=args.host, port=args.port, socket_port=args.socket_port + ) + dashboard.run(debug=args.debug) + except KeyboardInterrupt: + print("\nDashboard stopped by user") + except Exception as e: + print(f"Error: {e}") + import sys + + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/ftio/gui/data_models.py b/ftio/gui/data_models.py new file mode 100644 index 0000000..44bb2ca --- /dev/null +++ b/ftio/gui/data_models.py @@ -0,0 +1,145 @@ +""" +Data models for storing and managing prediction data from FTIO. + +This module provides dataclasses for structured storage of prediction data, +change points, frequency candidates, and a thread-safe data store for +managing prediction history. + +Author: Amine Aherbil +Copyright (c) 2025 TU Darmstadt, Germany +Date: January 2025 + +Licensed under the BSD 3-Clause License. +For more information, see the LICENSE file in the project root: +https://github.com/tuda-parallel/FTIO/blob/main/LICENSE +""" + +from dataclasses import dataclass + +import numpy as np + + +@dataclass +class FrequencyCandidate: + """Individual frequency candidate with confidence""" + + frequency: float + confidence: float + + +@dataclass +class ChangePoint: + """ADWIN detected change point information""" + + prediction_id: int + timestamp: float + old_frequency: float + new_frequency: float + frequency_change_percent: float + sample_number: int + cut_position: int + total_samples: int + + +@dataclass +class PredictionData: + """Single prediction instance data""" + + prediction_id: int + timestamp: str + dominant_freq: float + dominant_period: float + confidence: float + candidates: list[FrequencyCandidate] + time_window: tuple # (start, end) in seconds + total_bytes: str + bytes_transferred: str + current_hits: int + periodic_probability: float + frequency_range: tuple # (min_freq, max_freq) + period_range: tuple # (min_period, max_period) + is_change_point: bool = False + change_point: ChangePoint | None = None + sample_number: int | None = None + + +class PredictionDataStore: + """Manages all prediction data and provides query methods""" + + def __init__(self): + self.predictions: list[PredictionData] = [] + self.change_points: list[ChangePoint] = [] + self.current_prediction_id = -1 + + def add_prediction(self, prediction: PredictionData): + """Add a new prediction to the store""" + self.predictions.append(prediction) + if prediction.is_change_point and prediction.change_point: + self.change_points.append(prediction.change_point) + + def get_prediction_by_id(self, pred_id: int) -> PredictionData | None: + """Get prediction by ID""" + for pred in self.predictions: + if pred.prediction_id == pred_id: + return pred + return None + + def get_frequency_timeline(self) -> tuple: + """Get data for frequency timeline plot""" + if not self.predictions: + return [], [], [] + + pred_ids = [p.prediction_id for p in self.predictions] + frequencies = [p.dominant_freq for p in self.predictions] + confidences = [p.confidence for p in self.predictions] + + return pred_ids, frequencies, confidences + + def get_candidate_frequencies(self) -> dict[int, list[FrequencyCandidate]]: + """Get all candidate frequencies by prediction ID""" + candidates_dict = {} + for pred in self.predictions: + if pred.candidates: + candidates_dict[pred.prediction_id] = pred.candidates + return candidates_dict + + def get_change_points_for_timeline(self) -> tuple: + """Get change point data for timeline visualization""" + if not self.change_points: + return [], [], [] + + pred_ids = [cp.prediction_id for cp in self.change_points] + frequencies = [cp.new_frequency for cp in self.change_points] + labels = [ + f"{cp.old_frequency:.2f} → {cp.new_frequency:.2f} Hz" + for cp in self.change_points + ] + + return pred_ids, frequencies, labels + + def generate_cosine_wave(self, prediction_id: int, num_points: int = 1000) -> tuple: + """Generate cosine wave data for a specific prediction - DOMINANT FREQUENCY ONLY""" + pred = self.get_prediction_by_id(prediction_id) + if not pred: + return [], [], [] + + start_time, end_time = pred.time_window + duration = end_time - start_time + + t_relative = np.linspace(0, duration, num_points) + + primary_wave = np.cos(2 * np.pi * pred.dominant_freq * t_relative) + + candidate_waves = [] + + return t_relative, primary_wave, candidate_waves + + def get_latest_predictions(self, n: int = 50) -> list[PredictionData]: + """Get the latest N predictions""" + return self.predictions[-n:] if len(self.predictions) >= n else self.predictions + + def clear_data(self): + """Clear all stored data""" + self.predictions.clear() + self.change_points.clear() + self.current_prediction_id = -1 diff --git a/ftio/gui/socket_listener.py b/ftio/gui/socket_listener.py new file mode 100644 index 0000000..099e3cc --- /dev/null +++ b/ftio/gui/socket_listener.py @@ -0,0 +1,290 @@ +""" +Socket listener for receiving FTIO prediction data via direct JSON transmission. + +This module provides a TCP socket server that receives structured prediction +data from FTIO's online predictor via direct JSON transmission. + +Author: Amine Aherbil +Copyright (c) 2025 TU Darmstadt, Germany +Date: January 2025 + +Licensed under the BSD 3-Clause License. +For more information, see the LICENSE file in the project root: +https://github.com/tuda-parallel/FTIO/blob/main/LICENSE +""" + +import contextlib +import json +import logging +import socket +import threading +from collections.abc import Callable + +from ftio.gui.data_models import ( + ChangePoint, + FrequencyCandidate, + PredictionData, +) + + +class SocketListener: + """TCP server that receives FTIO prediction data from the online predictor. + + This class is the receiving end of the predictor-to-GUI communication channel. + The FTIO online predictor (predictor.py) runs as a separate process and sends + prediction results via TCP socket. This SocketListener runs inside the GUI + dashboard process and receives those messages. + + Architecture: + [FTIO Predictor Process] --TCP/JSON--> [SocketListener] --> [GUI Dashboard] + + The socket connection allows the predictor and GUI to run on different machines + if needed (e.g., predictor on HPC cluster, GUI on local workstation). + + Each received message contains: + - Prediction ID and timestamp + - Detected dominant frequency and confidence + - Time window that was analyzed + - Whether a change point was detected + - Change point details (old/new frequency, when it occurred) + + Messages are JSON-formatted and parsed into PredictionData objects, which are + then passed to the dashboard via the data_callback function for visualization. + + Attributes: + host: The hostname to bind the server to (default: "localhost"). + port: The port number to listen on (default: 9999). + data_callback: Function called when new prediction data arrives. + running: Boolean indicating if the server is currently running. + server_socket: The main TCP server socket. + client_connections: List of active client connections. + """ + + def __init__( + self, + host: str = "localhost", + port: int = 9999, + data_callback: Callable | None = None, + ): + """Initialize the socket listener with connection parameters. + + Args: + host: The hostname to bind the server to. Use "localhost" for local + connections only, or "0.0.0.0" to accept connections from any host. + port: The port number to listen on. Must match the port used by the + FTIO predictor's SocketLogger (default: 9999). + data_callback: Function to call when prediction data is received. + The callback receives a dict with 'type' and 'data' keys. + """ + self.host = host + self.port = port + self.data_callback = data_callback + self.running = False + self.server_socket = None + self.client_connections = [] + + def start_server(self): + """Start the TCP server and begin listening for predictor connections. + + This method blocks and runs the main server loop. It binds to the configured + host:port, listens for incoming connections, and spawns a new thread for + each connected client (predictor process). + + The server continues running until stop_server() is called or an error occurs. + Each client connection is handled in a separate daemon thread, allowing + multiple predictors to connect simultaneously. + + Raises: + OSError: If the port is already in use (errno 98) or other socket errors. + """ + try: + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + print(f"Attempting to bind to {self.host}:{self.port}") + self.server_socket.bind((self.host, self.port)) + self.server_socket.listen(5) + self.running = True + + print(f" Socket server successfully listening on {self.host}:{self.port}") + + while self.running: + try: + client_socket, address = self.server_socket.accept() + print(f" Client connected from {address}") + + client_thread = threading.Thread( + target=self._handle_client, args=(client_socket, address) + ) + client_thread.daemon = True + client_thread.start() + + except OSError as e: + if self.running: + print(f"Error accepting client connection: {e}") + break + except KeyboardInterrupt: + print(" Socket server interrupted") + break + + except OSError as e: + if e.errno == 98: # Address already in use + print( + f"Port {self.port} is already in use! Please use a different port or kill the process using it." + ) + else: + print(f"OS Error starting socket server: {e}") + self.running = False + except Exception as e: + print(f"Unexpected error starting socket server: {e}") + import traceback + + traceback.print_exc() + self.running = False + finally: + self.stop_server() + + def _handle_client(self, client_socket, address): + """Handle an individual client connection in a dedicated thread. + + Continuously receives JSON messages from the connected predictor process, + parses them into PredictionData objects, and forwards them to the dashboard + via the data_callback. + + Message format expected: + { + "type": "prediction", + "data": { + "prediction_id": int, + "timestamp": float, + "dominant_freq": float, + "confidence": float, + "is_change_point": bool, + "change_point": {...} or null, + ... + } + } + + Args: + client_socket: The socket connection to the client (predictor). + address: Tuple of (host, port) identifying the client. + """ + try: + while self.running: + try: + data = client_socket.recv(4096).decode("utf-8") + if not data: + break + + try: + message_data = json.loads(data) + + if ( + message_data.get("type") == "prediction" + and "data" in message_data + ): + print( + f"[DEBUG] Direct prediction data received: #{message_data['data']['prediction_id']}" + ) + + pred_data = message_data["data"] + + candidates = [] + for cand in pred_data.get("candidates", []): + candidates.append( + FrequencyCandidate( + frequency=cand["frequency"], + confidence=cand["confidence"], + ) + ) + + change_point = None + if pred_data.get("is_change_point") and pred_data.get( + "change_point" + ): + cp_data = pred_data["change_point"] + change_point = ChangePoint( + prediction_id=cp_data["prediction_id"], + timestamp=cp_data["timestamp"], + old_frequency=cp_data["old_frequency"], + new_frequency=cp_data["new_frequency"], + frequency_change_percent=cp_data[ + "frequency_change_percent" + ], + sample_number=cp_data["sample_number"], + cut_position=cp_data["cut_position"], + total_samples=cp_data["total_samples"], + ) + + prediction_data = PredictionData( + prediction_id=pred_data["prediction_id"], + timestamp=pred_data["timestamp"], + dominant_freq=pred_data["dominant_freq"], + dominant_period=pred_data["dominant_period"], + confidence=pred_data["confidence"], + candidates=candidates, + time_window=tuple(pred_data["time_window"]), + total_bytes=pred_data["total_bytes"], + bytes_transferred=pred_data["bytes_transferred"], + current_hits=pred_data["current_hits"], + periodic_probability=pred_data["periodic_probability"], + frequency_range=tuple(pred_data["frequency_range"]), + period_range=tuple(pred_data["period_range"]), + is_change_point=pred_data["is_change_point"], + change_point=change_point, + sample_number=pred_data.get("sample_number"), + ) + + if self.data_callback: + self.data_callback( + {"type": "prediction", "data": prediction_data} + ) + + except json.JSONDecodeError: + pass + + except OSError: + break + + except Exception as e: + logging.error(f"Error handling client {address}: {e}") + finally: + try: + client_socket.close() + print(f"Client {address} disconnected") + except: + pass + + def stop_server(self): + """Stop the server and close all connections. + + Sets running to False to signal all client handler threads to stop, + closes the main server socket, and closes all active client connections. + This method is safe to call multiple times. + """ + self.running = False + if self.server_socket: + with contextlib.suppress(BaseException): + self.server_socket.close() + + for client_socket in self.client_connections: + with contextlib.suppress(BaseException): + client_socket.close() + self.client_connections.clear() + print("Socket server stopped") + + def start_in_thread(self): + """Start the server in a background daemon thread. + + This is the recommended way to start the server when running alongside + the GUI dashboard. The daemon thread allows the program to exit cleanly + even if the server is still running. + + Returns: + threading.Thread: The thread object running the server. Can be used + to check if the server is still alive via thread.is_alive(). + """ + server_thread = threading.Thread(target=self.start_server) + server_thread.daemon = True + server_thread.start() + return server_thread diff --git a/ftio/gui/visualizations.py b/ftio/gui/visualizations.py new file mode 100644 index 0000000..c63f5f0 --- /dev/null +++ b/ftio/gui/visualizations.py @@ -0,0 +1,344 @@ +""" +Plotly/Dash visualization components for FTIO prediction data. + +This module provides visualization components for the FTIO dashboard including +frequency timeline plots, continuous cosine wave displays, change point markers, +and combined dashboard views. + +Author: Amine Aherbil +Copyright (c) 2025 TU Darmstadt, Germany +Date: January 2025 + +Licensed under the BSD 3-Clause License. +For more information, see the LICENSE file in the project root: +https://github.com/tuda-parallel/FTIO/blob/main/LICENSE +""" + +import numpy as np +import plotly.graph_objects as go +from plotly.subplots import make_subplots + +from ftio.gui.data_models import PredictionDataStore + + +class FrequencyTimelineViz: + """Creates frequency timeline visualization""" + + @staticmethod + def create_timeline_plot( + data_store: PredictionDataStore, title="FTIO Frequency Timeline" + ): + """Create main frequency timeline plot""" + + pred_ids, frequencies, confidences = data_store.get_frequency_timeline() + + if not pred_ids: + fig = go.Figure() + fig.add_annotation( + text="No prediction data available", + x=0.5, + y=0.5, + xref="paper", + yref="paper", + showarrow=False, + font={"size": 16, "color": "gray"}, + ) + fig.update_layout( + title=title, + xaxis_title="Prediction Index", + yaxis_title="Frequency (Hz)", + height=500, + ) + return fig + + fig = go.Figure() + + fig.add_trace( + go.Scatter( + x=pred_ids, + y=frequencies, + mode="lines+markers", + name="Dominant Frequency", + line={"color": "blue", "width": 2}, + marker={ + "size": 8, + "opacity": [conf / 100.0 for conf in confidences], + "color": "blue", + "line": {"width": 1, "color": "darkblue"}, + }, + hovertemplate="Prediction #%{x}
" + + "Frequency: %{y:.2f} Hz
" + + "Confidence: %{customdata:.1f}%", + customdata=confidences, + ) + ) + + candidates_dict = data_store.get_candidate_frequencies() + for pred_id, candidates in candidates_dict.items(): + for candidate in candidates: + if ( + candidate.frequency + != data_store.get_prediction_by_id(pred_id).dominant_freq + ): + fig.add_trace( + go.Scatter( + x=[pred_id], + y=[candidate.frequency], + mode="markers", + name=f"Candidate (conf: {candidate.confidence:.2f})", + marker={ + "size": 6, + "opacity": candidate.confidence, + "color": "orange", + "symbol": "diamond", + }, + showlegend=False, + hovertemplate="Candidate Frequency
" + + f"Frequency: {candidate.frequency:.2f} Hz
" + + f"Confidence: {candidate.confidence:.2f}", + ) + ) + + cp_pred_ids, cp_frequencies, cp_labels = ( + data_store.get_change_points_for_timeline() + ) + + if cp_pred_ids: + fig.add_trace( + go.Scatter( + x=cp_pred_ids, + y=cp_frequencies, + mode="markers", + name="Change Points", + marker={ + "size": 12, + "color": "red", + "symbol": "diamond", + "line": {"width": 2, "color": "darkred"}, + }, + hovertemplate="Change Point
" + + "Prediction #%{x}
" + + "%{customdata}", + customdata=cp_labels, + ) + ) + + for pred_id, freq, label in zip( + cp_pred_ids, cp_frequencies, cp_labels, strict=False + ): + fig.add_vline( + x=pred_id, + line_dash="dash", + line_color="red", + opacity=0.7, + annotation_text=label, + annotation_position="top", + ) + + fig.update_layout( + title={"text": title, "font": {"size": 18, "color": "darkblue"}}, + xaxis={ + "title": "Prediction Index", + "showgrid": True, + "gridcolor": "lightgray", + "tickmode": "linear", + }, + yaxis={"title": "Frequency (Hz)", "showgrid": True, "gridcolor": "lightgray"}, + hovermode="closest", + height=500, + margin={"l": 60, "r": 60, "t": 80, "b": 60}, + plot_bgcolor="white", + showlegend=True, + legend={ + "x": 0.02, + "y": 0.98, + "bgcolor": "rgba(255, 255, 255, 0.8)", + "bordercolor": "gray", + "borderwidth": 1, + }, + ) + + return fig + + +class CosineWaveViz: + """Creates cosine wave visualization for individual predictions""" + + @staticmethod + def create_cosine_plot( + data_store: PredictionDataStore, prediction_id: int, title=None, num_points=1000 + ): + """Create cosine wave plot for a specific prediction""" + + prediction = data_store.get_prediction_by_id(prediction_id) + if not prediction: + fig = go.Figure() + fig.add_annotation( + text=f"Prediction #{prediction_id} not found", + x=0.5, + y=0.5, + xref="paper", + yref="paper", + showarrow=False, + font={"size": 16, "color": "gray"}, + ) + fig.update_layout( + title=f"Cosine Wave - Prediction #{prediction_id}", + xaxis_title="Time (s)", + yaxis_title="Amplitude", + height=400, + ) + return fig + + t, primary_wave, candidate_waves = data_store.generate_cosine_wave( + prediction_id, num_points + ) + + if title is None: + title = ( + f"Cosine Wave - Prediction #{prediction_id} " + f"(f = {prediction.dominant_freq:.2f} Hz)" + ) + + fig = go.Figure() + + fig.add_trace( + go.Scatter( + x=t, + y=primary_wave, + mode="lines", + name=f"I/O Pattern: {prediction.dominant_freq:.2f} Hz", + line={"color": "#1f77b4", "width": 3}, + hovertemplate="I/O Pattern
" + + "Time: %{x:.3f} s
" + + "Amplitude: %{y:.3f}
" + + f"Frequency: {prediction.dominant_freq:.2f} Hz", + ) + ) + + if prediction.is_change_point and prediction.change_point: + cp_time = prediction.change_point.timestamp + start_time, end_time = prediction.time_window + if start_time <= cp_time <= end_time: + cp_relative = cp_time - start_time + fig.add_vline( + x=cp_relative, + line_dash="dash", + line_color="red", + line_width=3, + opacity=0.8, + annotation_text=( + f"Change Point
" + f"{prediction.change_point.old_frequency:.2f} → " + f"{prediction.change_point.new_frequency:.2f} Hz" + ), + annotation_position="top", + ) + + start_time, end_time = prediction.time_window + duration = end_time - start_time + fig.update_layout( + title={"text": title, "font": {"size": 16, "color": "darkblue"}}, + xaxis={ + "title": f"Time (s) - Duration: {duration:.2f}s", + "range": [0, duration], + "showgrid": True, + "gridcolor": "lightgray", + }, + yaxis={ + "title": "Amplitude", + "range": [-1.2, 1.2], + "showgrid": True, + "gridcolor": "lightgray", + }, + height=400, + margin={"l": 60, "r": 60, "t": 60, "b": 60}, + plot_bgcolor="white", + showlegend=True, + legend={ + "x": 0.02, + "y": 0.98, + "bgcolor": "rgba(255, 255, 255, 0.8)", + "bordercolor": "gray", + "borderwidth": 1, + }, + ) + + return fig + + +class DashboardViz: + """Creates comprehensive dashboard visualization""" + + @staticmethod + def create_dashboard(data_store: PredictionDataStore, selected_prediction_id=None): + """Create comprehensive dashboard with multiple views""" + + fig = make_subplots( + rows=2, + cols=2, + subplot_titles=( + "Frequency Timeline", + "Latest Predictions", + "Cosine Wave View", + "Statistics", + ), + specs=[[{"colspan": 2}, None], [{}, {}]], + row_heights=[0.6, 0.4], + vertical_spacing=0.1, + ) + + timeline_fig = FrequencyTimelineViz.create_timeline_plot(data_store) + for trace in timeline_fig.data: + fig.add_trace(trace, row=1, col=1) + + if selected_prediction_id is not None: + cosine_fig = CosineWaveViz.create_cosine_plot( + data_store, selected_prediction_id + ) + for trace in cosine_fig.data: + fig.add_trace(trace, row=2, col=1) + + stats = DashboardViz._calculate_stats(data_store) + fig.add_trace( + go.Bar( + x=list(stats.keys()), + y=list(stats.values()), + name="Statistics", + marker_color="lightblue", + ), + row=2, + col=2, + ) + + fig.update_layout( + height=800, title_text="FTIO Prediction Dashboard", showlegend=True + ) + + fig.update_xaxes(title_text="Prediction Index", row=1, col=1) + fig.update_yaxes(title_text="Frequency (Hz)", row=1, col=1) + fig.update_xaxes(title_text="Time (s)", row=2, col=1) + fig.update_yaxes(title_text="Amplitude", row=2, col=1) + fig.update_xaxes(title_text="Metric", row=2, col=2) + fig.update_yaxes(title_text="Value", row=2, col=2) + + return fig + + @staticmethod + def _calculate_stats(data_store: PredictionDataStore) -> dict[str, float]: + """Calculate basic statistics from prediction data""" + if not data_store.predictions: + return {} + + frequencies = [p.dominant_freq for p in data_store.predictions] + confidences = [p.confidence for p in data_store.predictions] + + stats = { + "Total Predictions": len(data_store.predictions), + "Change Points": len(data_store.change_points), + "Avg Frequency": np.mean(frequencies), + "Avg Confidence": np.mean(confidences), + "Freq Std Dev": np.std(frequencies), + } + + return stats diff --git a/ftio/parse/args.py b/ftio/parse/args.py index 51bcc22..5979136 100644 --- a/ftio/parse/args.py +++ b/ftio/parse/args.py @@ -257,6 +257,14 @@ def parse_args(argv: list, name="") -> argparse.Namespace: help="specifies the number of hits needed to adapt the time window. A hit occurs once a dominant frequency is found", ) parser.set_defaults(hits=3) + parser.add_argument( + "--online_adaptation", + dest="online_adaptation", + type=str, + choices=["adwin", "cusum", "ph"], + help="change point detection algorithm to use. 'adwin' (default) uses Adaptive Windowing with automatic window sizing and mathematical guarantees. 'cusum' uses Cumulative Sum detection for rapid change detection. 'ph' uses Page-Hinkley test for sequential change point detection.", + ) + parser.set_defaults(online_adaptation="adwin") parser.add_argument( "-v", "--verbose", @@ -271,6 +279,12 @@ def parse_args(argv: list, name="") -> argparse.Namespace: help="avoids opening the generated HTML file since zmq is used", ) parser.set_defaults(zmq=False) + parser.add_argument( + "--gui", + action="store_true", + help="enables forwarding prediction data to the FTIO GUI dashboard. Start the GUI first with 'ftio-gui' then run predictor with this flag.", + ) + parser.set_defaults(gui=False) parser.add_argument( "--zmq_source", type=str, diff --git a/ftio/prediction/change_point_detection.py b/ftio/prediction/change_point_detection.py new file mode 100644 index 0000000..63e9f71 --- /dev/null +++ b/ftio/prediction/change_point_detection.py @@ -0,0 +1,1325 @@ +""" +Change point detection algorithms for FTIO online predictor. + +This module provides adaptive change point detection algorithms for detecting +I/O pattern changes in streaming data. It includes three algorithms: +- ADWIN: Adaptive Windowing with Hoeffding bounds for statistical guarantees +- AV-CUSUM: Adaptive-Variance Cumulative Sum for rapid change detection +- STPH: Self-Tuning Page-Hinkley test for sequential change point detection + +Author: Amine Aherbil +Copyright (c) 2025 TU Darmstadt, Germany +Date: January 2025 + +Licensed under the BSD 3-Clause License. +For more information, see the LICENSE file in the project root: +https://github.com/tuda-parallel/FTIO/blob/main/LICENSE +""" + +from __future__ import annotations + +import math +from typing import Any + +import numpy as np +from rich.console import Console + +from ftio.freq.prediction import Prediction +from ftio.prediction.helper import get_dominant + + +class ChangePointDetector: + """ADWIN detector for I/O pattern changes with automatic window sizing.""" + + def __init__( + self, + delta: float = 0.05, + shared_resources=None, + show_init: bool = True, + verbose: bool = False, + ): + """Initialize ADWIN detector with confidence parameter delta (default: 0.05).""" + self.delta = min(max(delta, 1e-12), 1 - 1e-12) + self.shared_resources = shared_resources + self.verbose = verbose + + if shared_resources and not shared_resources.detector_initialized.value: + if hasattr(shared_resources, "detector_lock"): + with shared_resources.detector_lock: + if not shared_resources.detector_initialized.value: + shared_resources.detector_frequencies[:] = [] + shared_resources.detector_timestamps[:] = [] + shared_resources.detector_total_samples.value = 0 + shared_resources.detector_change_count.value = 0 + shared_resources.detector_last_change_time.value = 0.0 + shared_resources.detector_initialized.value = True + else: + if not shared_resources.detector_initialized.value: + shared_resources.detector_frequencies[:] = [] + shared_resources.detector_timestamps[:] = [] + shared_resources.detector_total_samples.value = 0 + shared_resources.detector_change_count.value = 0 + shared_resources.detector_last_change_time.value = 0.0 + shared_resources.detector_initialized.value = True + + if shared_resources is None: + self.frequencies: list[float] = [] + self.timestamps: list[float] = [] + self.total_samples = 0 + self.change_count = 0 + self.last_change_time: float | None = None + + self.last_change_point: int | None = None + self.min_window_size = 2 + self.console = Console() + + if show_init: + self.console.print( + f"[green][ADWIN] Initialized with δ={delta:.3f} " + f"({(1-delta)*100:.0f}% confidence) " + f"[Process-safe: {shared_resources is not None}][/]" + ) + + def _get_frequencies(self): + if self.shared_resources: + return self.shared_resources.detector_frequencies + return self.frequencies + + def _get_timestamps(self): + if self.shared_resources: + return self.shared_resources.detector_timestamps + return self.timestamps + + def _get_total_samples(self): + if self.shared_resources: + return self.shared_resources.detector_total_samples.value + return self.total_samples + + def _set_total_samples(self, value): + if self.shared_resources: + self.shared_resources.detector_total_samples.value = value + else: + self.total_samples = value + + def _get_change_count(self): + if self.shared_resources: + return self.shared_resources.detector_change_count.value + return self.change_count + + def _set_change_count(self, value): + if self.shared_resources: + self.shared_resources.detector_change_count.value = value + else: + self.change_count = value + + def _get_last_change_time(self): + if self.shared_resources: + return ( + self.shared_resources.detector_last_change_time.value + if self.shared_resources.detector_last_change_time.value > 0 + else None + ) + return self.last_change_time + + def _set_last_change_time(self, value): + if self.shared_resources: + self.shared_resources.detector_last_change_time.value = ( + value if value is not None else 0.0 + ) + else: + self.last_change_time = value + + def _reset_window(self): + frequencies = self._get_frequencies() + timestamps = self._get_timestamps() + + if self.shared_resources: + del frequencies[:] + del timestamps[:] + self._set_total_samples(0) + self._set_last_change_time(None) + else: + self.frequencies.clear() + self.timestamps.clear() + self._set_total_samples(0) + self._set_last_change_time(None) + + self.console.print( + "[dim yellow][ADWIN] Window cleared: No frequency data to analyze[/]" + ) + + def add_prediction( + self, prediction: Prediction, timestamp: float + ) -> tuple[int, float] | None: + + freq = get_dominant(prediction) + + if np.isnan(freq) or freq <= 0: + self.console.print( + "[yellow][ADWIN] No frequency found - resetting window history[/]" + ) + self._reset_window() + return None + + if self.shared_resources and hasattr(self.shared_resources, "detector_lock"): + with self.shared_resources.detector_lock: + return self._add_prediction_synchronized(prediction, timestamp, freq) + else: + return self._add_prediction_local(prediction, timestamp, freq) + + def _add_prediction_synchronized( + self, prediction: Prediction, timestamp: float, freq: float + ) -> tuple[int, float] | None: + frequencies = self._get_frequencies() + timestamps = self._get_timestamps() + + frequencies.append(freq) + timestamps.append(timestamp) + self._set_total_samples(self._get_total_samples() + 1) + + if len(frequencies) < self.min_window_size: + return None + + change_point = self._detect_change() + + if change_point is not None: + exact_change_timestamp = timestamps[change_point] + + self._process_change_point(change_point) + self._set_change_count(self._get_change_count() + 1) + + return (change_point, exact_change_timestamp) + + return None + + def _add_prediction_local( + self, prediction: Prediction, timestamp: float, freq: float + ) -> tuple[int, float] | None: + frequencies = self._get_frequencies() + timestamps = self._get_timestamps() + + frequencies.append(freq) + timestamps.append(timestamp) + self._set_total_samples(self._get_total_samples() + 1) + + if len(frequencies) < self.min_window_size: + return None + + change_point = self._detect_change() + + if change_point is not None: + exact_change_timestamp = timestamps[change_point] + + self._process_change_point(change_point) + self._set_change_count(self._get_change_count() + 1) + + return (change_point, exact_change_timestamp) + + return None + + def _detect_change(self) -> int | None: + + frequencies = self._get_frequencies() + timestamps = self._get_timestamps() + n = len(frequencies) + + if n < 2 * self.min_window_size: + return None + + for cut in range(self.min_window_size, n - self.min_window_size + 1): + if self._test_cut_point(cut): + self.console.print( + f"[blue][ADWIN] Change detected at position {cut}/{n}, " + f"time={timestamps[cut]:.3f}s[/]" + ) + return cut + + return None + + def _test_cut_point(self, cut: int) -> bool: + + frequencies = self._get_frequencies() + len(frequencies) + + left_data = frequencies[:cut] + n0 = len(left_data) + mean0 = np.mean(left_data) + + right_data = frequencies[cut:] + n1 = len(right_data) + mean1 = np.mean(right_data) + + if n0 <= 0 or n1 <= 0: + return False + + n_harmonic = (n0 * n1) / (n0 + n1) + + try: + + confidence_term = math.log(2.0 / self.delta) / (2.0 * n_harmonic) + threshold = math.sqrt(2.0 * confidence_term) + + except (ValueError, ZeroDivisionError): + threshold = 0.05 + + mean_diff = abs(mean1 - mean0) + + if self.verbose: + self.console.print(f"[dim blue][ADWIN DEBUG] Cut={cut}:[/]") + self.console.print( + f" [dim]• Left window: {n0} samples, mean={mean0:.3f}Hz[/]" + ) + self.console.print( + f" [dim]• Right window: {n1} samples, mean={mean1:.3f}Hz[/]" + ) + self.console.print( + f" [dim]• Mean difference: |{mean1:.3f} - {mean0:.3f}| = {mean_diff:.3f}[/]" + ) + self.console.print(f" [dim]• Harmonic mean: {n_harmonic:.1f}[/]") + self.console.print( + f" [dim]• Confidence term: log(2/{self.delta}) / (2×{n_harmonic:.1f}) = {confidence_term:.6f}[/]" + ) + self.console.print( + f" [dim]• Threshold: √(2×{confidence_term:.6f}) = {threshold:.3f}[/]" + ) + self.console.print( + f" [dim]• Test: {mean_diff:.3f} > {threshold:.3f} ? {'CHANGE!' if mean_diff > threshold else 'No change'}[/]" + ) + + return mean_diff > threshold + + def _process_change_point(self, change_point: int): + + frequencies = self._get_frequencies() + timestamps = self._get_timestamps() + + self.last_change_point = change_point + change_time = timestamps[change_point] + self._set_last_change_time(change_time) + + old_window_size = len(frequencies) + old_freq = np.mean(frequencies[:change_point]) if change_point > 0 else 0 + + if self.shared_resources: + del frequencies[:change_point] + del timestamps[:change_point] + new_frequencies = frequencies + new_timestamps = timestamps + else: + self.frequencies = frequencies[change_point:] + self.timestamps = timestamps[change_point:] + new_frequencies = self.frequencies + new_timestamps = self.timestamps + + new_window_size = len(new_frequencies) + new_freq = np.mean(new_frequencies) if new_frequencies else 0 + + freq_change = abs(new_freq - old_freq) / old_freq * 100 if old_freq > 0 else 0 + time_span = ( + new_timestamps[-1] - new_timestamps[0] if len(new_timestamps) > 1 else 0 + ) + + self.console.print( + f"[green][ADWIN] Window adapted: " + f"{old_window_size} → {new_window_size} samples[/]" + ) + self.console.print( + f"[green][ADWIN] Frequency shift: " + f"{old_freq:.3f} → {new_freq:.3f} Hz ({freq_change:.1f}%)[/]" + ) + self.console.print(f"[green][ADWIN] New window span: {time_span:.2f} seconds[/]") + + def get_adaptive_start_time(self, current_prediction: Prediction) -> float: + + timestamps = self._get_timestamps() + + if len(timestamps) == 0: + return current_prediction.t_start + + last_change_time = self._get_last_change_time() + if last_change_time is not None: + exact_change_start = last_change_time + + min_window = 0.5 + max_lookback = 10.0 + + window_span = current_prediction.t_end - exact_change_start + + if window_span < min_window: + adaptive_start = max(0, current_prediction.t_end - min_window) + self.console.print( + f"[yellow][ADWIN] Change point too recent, using min window: " + f"{adaptive_start:.6f}s[/]" + ) + elif window_span > max_lookback: + adaptive_start = max(0, current_prediction.t_end - max_lookback) + self.console.print( + f"[yellow][ADWIN] Change point too old, using max lookback: " + f"{adaptive_start:.6f}s[/]" + ) + else: + adaptive_start = exact_change_start + self.console.print( + f"[green][ADWIN] Using EXACT change point timestamp: " + f"{adaptive_start:.6f}s (window span: {window_span:.3f}s)[/]" + ) + + return adaptive_start + + window_start = timestamps[0] + + min_start = current_prediction.t_end - 10.0 + max_start = current_prediction.t_end - 0.5 + + adaptive_start = max(min_start, min(window_start, max_start)) + + return adaptive_start + + def get_window_stats(self) -> dict[str, Any]: + """Get current ADWIN window statistics for debugging and logging.""" + frequencies = self._get_frequencies() + timestamps = self._get_timestamps() + + if not frequencies: + return { + "size": 0, + "mean": 0.0, + "std": 0.0, + "range": [0.0, 0.0], + "time_span": 0.0, + "total_samples": self._get_total_samples(), + "change_count": self._get_change_count(), + } + + return { + "size": len(frequencies), + "mean": np.mean(frequencies), + "std": np.std(frequencies), + "range": [float(np.min(frequencies)), float(np.max(frequencies))], + "time_span": ( + float(timestamps[-1] - timestamps[0]) if len(timestamps) > 1 else 0.0 + ), + "total_samples": self._get_total_samples(), + "change_count": self._get_change_count(), + } + + def should_adapt_window(self) -> bool: + """Check if window adaptation should be triggered.""" + return self.last_change_point is not None + + def log_change_point(self, counter: int, old_freq: float, new_freq: float) -> str: + + last_change_time = self._get_last_change_time() + if last_change_time is None: + return "" + + freq_change_pct = abs(new_freq - old_freq) / old_freq * 100 if old_freq > 0 else 0 + stats = self.get_window_stats() + + log_msg = ( + f"[red bold][CHANGE_POINT] t_s={last_change_time:.3f} sec[/]\n" + f"[purple][PREDICTOR] (#{counter}):[/][yellow] " + f"ADWIN detected pattern change: {old_freq:.3f} → {new_freq:.3f} Hz " + f"({freq_change_pct:.1f}% change)[/]\n" + f"[purple][PREDICTOR] (#{counter}):[/][yellow] " + f"Adaptive window: {stats['size']} samples, " + f"span={stats['time_span']:.1f}s, " + f"changes={stats['change_count']}/{stats['total_samples']}[/]\n" + f"[dim blue]ADWIN ANALYSIS: Statistical significance detected using Hoeffding bounds[/]\n" + f"[dim blue]Window split analysis found mean difference > confidence threshold[/]\n" + f"[dim blue]Confidence level: {(1-self.delta)*100:.0f}% (δ={self.delta:.3f})[/]" + ) + + self.last_change_point = None + + return log_msg + + def get_change_point_time(self, shared_resources=None) -> float | None: + + return self._get_last_change_time() + + +def detect_pattern_change_adwin( + shared_resources, + current_prediction: Prediction, + detector: ChangePointDetector, + counter: int, +) -> tuple[bool, str | None, float, float | None, float | None]: + + change_point = detector.add_prediction(current_prediction, current_prediction.t_end) + + if change_point is not None: + change_idx, change_time = change_point + + current_freq = get_dominant(current_prediction) + + old_freq = current_freq + frequencies = detector._get_frequencies() + if len(frequencies) > 1: + window_stats = detector.get_window_stats() + old_freq = max(0.1, window_stats["mean"] * 0.9) + + log_msg = detector.log_change_point(counter, old_freq, current_freq) + + new_start_time = detector.get_adaptive_start_time(current_prediction) + + try: + from ftio.prediction.online_analysis import get_socket_logger + + logger = get_socket_logger() + if logger is not None: + logger.send_log( + "change_point", + "ADWIN Change Point Detected", + { + "exact_time": change_time, + "old_freq": old_freq, + "new_freq": current_freq, + "adaptive_start": new_start_time, + "counter": counter, + }, + ) + except ImportError: + pass + + return True, log_msg, new_start_time, old_freq, current_freq + + return False, None, current_prediction.t_start, None, None + + +class CUSUMDetector: + """Adaptive-Variance CUSUM detector with variance-based threshold adaptation.""" + + def __init__( + self, + window_size: int = 50, + shared_resources=None, + show_init: bool = True, + verbose: bool = False, + ): + """Initialize AV-CUSUM detector with rolling window size (default: 50).""" + self.window_size = window_size + self.shared_resources = shared_resources + self.show_init = show_init + self.verbose = verbose + + self.sum_pos = 0.0 + self.sum_neg = 0.0 + self.reference = None + self.initialized = False + + self.adaptive_threshold = 0.0 + self.adaptive_drift = 0.0 + self.rolling_std = 0.0 + self.frequency_buffer = [] + + self.console = Console() + + def _update_adaptive_parameters(self, freq: float): + """Calculate thresholds automatically from data standard deviation.""" + import numpy as np + + if self.shared_resources and hasattr( + self.shared_resources, "detector_frequencies" + ): + if hasattr(self.shared_resources, "detector_lock"): + with self.shared_resources.detector_lock: + all_freqs = list(self.shared_resources.detector_frequencies) + recent_freqs = ( + all_freqs[-self.window_size - 1 : -1] + if len(all_freqs) > 1 + else [] + ) + else: + all_freqs = list(self.shared_resources.detector_frequencies) + recent_freqs = ( + all_freqs[-self.window_size - 1 : -1] if len(all_freqs) > 1 else [] + ) + else: + self.frequency_buffer.append(freq) + if len(self.frequency_buffer) > self.window_size: + self.frequency_buffer.pop(0) + recent_freqs = ( + self.frequency_buffer[:-1] if len(self.frequency_buffer) > 1 else [] + ) + + if self.verbose: + self.console.print( + f"[dim magenta][CUSUM DEBUG] Buffer for σ calculation (excluding current): {[f'{f:.3f}' for f in recent_freqs]} (len={len(recent_freqs)})[/]" + ) + + if len(recent_freqs) >= 3: + freqs = np.array(recent_freqs) + self.rolling_std = np.std(freqs) + + std_factor = max(self.rolling_std, 0.01) + + self.adaptive_threshold = 2.0 * std_factor + self.adaptive_drift = 0.5 * std_factor + + if self.verbose: + self.console.print( + f"[dim cyan][CUSUM] σ={self.rolling_std:.3f}, " + f"h_t={self.adaptive_threshold:.3f} (2σ threshold), " + f"k_t={self.adaptive_drift:.3f} (0.5σ drift)[/]" + ) + + def _reset_cusum_state(self): + """Reset CUSUM state when no frequency is detected.""" + self.sum_pos = 0.0 + self.sum_neg = 0.0 + self.reference = None + self.initialized = False + + self.frequency_buffer.clear() + self.rolling_std = 0.0 + self.adaptive_threshold = 0.0 + self.adaptive_drift = 0.0 + + if self.shared_resources: + if hasattr(self.shared_resources, "detector_lock"): + with self.shared_resources.detector_lock: + del self.shared_resources.detector_frequencies[:] + del self.shared_resources.detector_timestamps[:] + else: + del self.shared_resources.detector_frequencies[:] + del self.shared_resources.detector_timestamps[:] + + self.console.print( + "[dim yellow][CUSUM] State cleared: Starting fresh when frequency resumes[/]" + ) + + def add_frequency( + self, freq: float, timestamp: float = None + ) -> tuple[bool, dict[str, Any]]: + + if np.isnan(freq) or freq <= 0: + self.console.print( + "[yellow][AV-CUSUM] No frequency found - resetting algorithm state[/]" + ) + self._reset_cusum_state() + return False, {} + + if self.shared_resources: + if hasattr(self.shared_resources, "detector_lock"): + with self.shared_resources.detector_lock: + self.shared_resources.detector_frequencies.append(freq) + self.shared_resources.detector_timestamps.append(timestamp or 0.0) + else: + self.shared_resources.detector_frequencies.append(freq) + self.shared_resources.detector_timestamps.append(timestamp or 0.0) + + self._update_adaptive_parameters(freq) + + if not self.initialized: + min_init_samples = 3 + if self.shared_resources: + freq_list = list(self.shared_resources.detector_frequencies) + else: + freq_list = self.frequency_buffer + + if len(freq_list) >= min_init_samples: + first_freqs = freq_list[:min_init_samples] + self.reference = np.mean(first_freqs) + self.initialized = True + if self.show_init: + self.console.print( + f"[yellow][AV-CUSUM] Reference established: {self.reference:.3f} Hz " + f"(from first {min_init_samples} observations: {[f'{f:.3f}' for f in first_freqs]})[/]" + ) + else: + current_count = len(freq_list) + self.console.print( + f"[dim yellow][AV-CUSUM] Collecting calibration data ({current_count}/{min_init_samples})[/]" + ) + return False, {} + + deviation = freq - self.reference + + new_sum_pos = max(0, self.sum_pos + deviation - self.adaptive_drift) + new_sum_neg = max(0, self.sum_neg - deviation - self.adaptive_drift) + + self.sum_pos = new_sum_pos + self.sum_neg = new_sum_neg + + if self.verbose: + current_window_size = ( + len(self.shared_resources.detector_frequencies) + if self.shared_resources + else 0 + ) + + self.console.print( + f"[dim yellow][AV-CUSUM DEBUG] Observation #{current_window_size}:[/]" + ) + self.console.print(f" [dim]• Current freq: {freq:.3f} Hz[/]") + self.console.print(f" [dim]• Reference: {self.reference:.3f} Hz[/]") + self.console.print( + f" [dim]• Deviation: {freq:.3f} - {self.reference:.3f} = {deviation:.3f}[/]" + ) + self.console.print( + f" [dim]• Adaptive drift: {self.adaptive_drift:.3f} (k_t = 0.5×σ, σ={self.rolling_std:.3f})[/]" + ) + self.console.print(f" [dim]• Sum_pos before: {self.sum_pos:.3f}[/]") + self.console.print(f" [dim]• Sum_neg before: {self.sum_neg:.3f}[/]") + self.console.print( + f" [dim]• Sum_pos calculation: max(0, {self.sum_pos:.3f} + {deviation:.3f} - {self.adaptive_drift:.3f}) = {new_sum_pos:.3f}[/]" + ) + self.console.print( + f" [dim]• Sum_neg calculation: max(0, {self.sum_neg:.3f} - {deviation:.3f} - {self.adaptive_drift:.3f}) = {new_sum_neg:.3f}[/]" + ) + self.console.print( + f" [dim]• Adaptive threshold: {self.adaptive_threshold:.3f} (h_t = 2.0×σ, σ={self.rolling_std:.3f})[/]" + ) + self.console.print( + f" [dim]• Upward change test: {self.sum_pos:.3f} > {self.adaptive_threshold:.3f} = {'UPWARD CHANGE!' if self.sum_pos > self.adaptive_threshold else 'No change'}[/]" + ) + self.console.print( + f" [dim]• Downward change test: {self.sum_neg:.3f} > {self.adaptive_threshold:.3f} = {'DOWNWARD CHANGE!' if self.sum_neg > self.adaptive_threshold else 'No change'}[/]" + ) + + if self.shared_resources and hasattr( + self.shared_resources, "detector_frequencies" + ): + sample_count = len(self.shared_resources.detector_frequencies) + else: + sample_count = len(self.frequency_buffer) + + if sample_count < 3 or self.adaptive_threshold <= 0: + return False, {} + + upward_change = self.sum_pos > self.adaptive_threshold + downward_change = self.sum_neg > self.adaptive_threshold + change_detected = upward_change or downward_change + + change_info = { + "timestamp": timestamp, + "frequency": freq, + "reference": self.reference, + "sum_pos": self.sum_pos, + "sum_neg": self.sum_neg, + "threshold": self.adaptive_threshold, + "rolling_std": self.rolling_std, + "deviation": deviation, + "change_type": ( + "increase" if upward_change else "decrease" if downward_change else "none" + ), + } + + if change_detected: + change_type = change_info["change_type"] + change_percent = ( + abs(deviation / self.reference * 100) if self.reference != 0 else 0 + ) + + self.console.print( + f"[bold yellow][AV-CUSUM] CHANGE DETECTED! " + f"{self.reference:.3f}Hz → {freq:.3f}Hz " + f"({change_percent:.1f}% {change_type})[/]" + ) + self.console.print( + f"[yellow][AV-CUSUM] Sum_pos={self.sum_pos:.2f}, Sum_neg={self.sum_neg:.2f}, " + f"Adaptive_Threshold={self.adaptive_threshold:.2f}[/]" + ) + self.console.print( + f"[dim yellow]AV-CUSUM ANALYSIS: Cumulative sum exceeded adaptive threshold {self.adaptive_threshold:.2f}[/]" + ) + self.console.print( + f"[dim yellow]Detection method: {'Positive sum (upward trend)' if upward_change else 'Negative sum (downward trend)'}[/]" + ) + self.console.print( + f"[dim yellow]Adaptive drift: {self.adaptive_drift:.3f} (σ={self.rolling_std:.3f})[/]" + ) + + old_reference = self.reference + self.reference = freq + self.console.print( + f"[cyan][CUSUM] Reference updated: {old_reference:.3f} → {self.reference:.3f} Hz " + f"({change_percent:.1f}% change)[/]" + ) + + self.sum_pos = 0.0 + self.sum_neg = 0.0 + + if self.shared_resources: + if hasattr(self.shared_resources, "detector_lock"): + with self.shared_resources.detector_lock: + old_window_size = len(self.shared_resources.detector_frequencies) + + current_freq_list = [freq] + current_timestamp_list = [timestamp or 0.0] + + self.shared_resources.detector_frequencies[:] = current_freq_list + self.shared_resources.detector_timestamps[:] = ( + current_timestamp_list + ) + + self.console.print( + f"[green][CUSUM] CHANGE POINT ADAPTATION: Discarded {old_window_size-1} past samples, " + f"starting fresh from current detection[/]" + ) + self.console.print( + f"[green][CUSUM] WINDOW RESET: {old_window_size} → {len(self.shared_resources.detector_frequencies)} samples[/]" + ) + + self.shared_resources.detector_change_count.value += 1 + else: + old_window_size = len(self.shared_resources.detector_frequencies) + current_freq_list = [freq] + current_timestamp_list = [timestamp or 0.0] + self.shared_resources.detector_frequencies[:] = current_freq_list + self.shared_resources.detector_timestamps[:] = current_timestamp_list + self.console.print( + f"[green][CUSUM] CHANGE POINT ADAPTATION: Discarded {old_window_size-1} past samples[/]" + ) + self.shared_resources.detector_change_count.value += 1 + + return change_detected, change_info + + +def detect_pattern_change_cusum( + shared_resources, + current_prediction: Prediction, + detector: CUSUMDetector, + counter: int, +) -> tuple[bool, str | None, float, float | None, float | None]: + + current_freq = get_dominant(current_prediction) + current_time = current_prediction.t_end + + if np.isnan(current_freq): + detector._reset_cusum_state() + return False, None, current_prediction.t_start, None, None + + change_detected, change_info = detector.add_frequency(current_freq, current_time) + + if not change_detected: + return False, None, current_prediction.t_start, None, None + + change_type = change_info["change_type"] + reference = change_info["reference"] + threshold = change_info["threshold"] + sum_pos = change_info["sum_pos"] + sum_neg = change_info["sum_neg"] + + magnitude = abs(current_freq - reference) + percent_change = (magnitude / reference * 100) if reference > 0 else 0 + + log_msg = ( + f"[bold red][CUSUM] CHANGE DETECTED! " + f"{reference:.1f}Hz → {current_freq:.1f}Hz " + f"(Δ={magnitude:.1f}Hz, {percent_change:.1f}% {change_type}) " + f"at sample {len(shared_resources.detector_frequencies)}, time={current_time:.3f}s[/]\n" + f"[red][CUSUM] CUSUM stats: sum_pos={sum_pos:.2f}, sum_neg={sum_neg:.2f}, " + f"threshold={threshold}[/]\n" + f"[red][CUSUM] Cumulative sum exceeded threshold -> Starting fresh analysis[/]" + ) + + if percent_change > 100: + min_window_size = 0.5 + + elif percent_change > 50: + min_window_size = 1.0 + else: + min_window_size = 2.0 + + new_start_time = max(0, current_time - min_window_size) + + try: + from ftio.prediction.online_analysis import get_socket_logger + + logger = get_socket_logger() + if logger is not None: + logger.send_log( + "change_point", + "CUSUM Change Point Detected", + { + "algorithm": "CUSUM", + "detection_time": current_time, + "change_type": change_type, + "frequency": current_freq, + "reference": reference, + "magnitude": magnitude, + "percent_change": percent_change, + "threshold": threshold, + "counter": counter, + }, + ) + except ImportError: + pass + + return True, log_msg, new_start_time, reference, current_freq + + +class SelfTuningPageHinkleyDetector: + """Self-Tuning Page-Hinkley detector with adaptive running mean baseline.""" + + def __init__( + self, + window_size: int = 10, + shared_resources=None, + show_init: bool = True, + verbose: bool = False, + ): + """Initialize STPH detector with rolling window size (default: 10).""" + self.window_size = window_size + self.shared_resources = shared_resources + self.show_init = show_init + self.verbose = verbose + self.console = Console() + + self.adaptive_threshold = 0.0 + self.adaptive_delta = 0.0 + self.rolling_std = 0.0 + self.frequency_buffer = [] + + self.cumulative_sum_pos = 0.0 + self.cumulative_sum_neg = 0.0 + self.reference_mean = 0.0 + self.sum_of_samples = 0.0 + self.sample_count = 0 + + if shared_resources and hasattr(shared_resources, "detector_state"): + try: + state = dict(shared_resources.detector_state) + if state.get("initialized", False): + self.cumulative_sum_pos = state.get("cumulative_sum_pos", 0.0) + self.cumulative_sum_neg = state.get("cumulative_sum_neg", 0.0) + self.reference_mean = state.get("reference_mean", 0.0) + self.sum_of_samples = state.get("sum_of_samples", 0.0) + self.sample_count = state.get("sample_count", 0) + if self.verbose: + self.console.print( + f"[green][PH DEBUG] Restored state: cusum_pos={self.cumulative_sum_pos:.3f}, cusum_neg={self.cumulative_sum_neg:.3f}, ref_mean={self.reference_mean:.3f}[/]" + ) + else: + self._initialize_fresh_state() + except Exception as e: + if self.verbose: + self.console.print(f"[red][PH DEBUG] State restore failed: {e}[/]") + self._initialize_fresh_state() + else: + self._initialize_fresh_state() + + def _update_adaptive_parameters(self, freq: float): + """Calculate thresholds automatically from data standard deviation.""" + import numpy as np + + if self.shared_resources and hasattr( + self.shared_resources, "detector_frequencies" + ): + if hasattr(self.shared_resources, "detector_lock"): + with self.shared_resources.detector_lock: + all_freqs = list(self.shared_resources.detector_frequencies) + recent_freqs = ( + all_freqs[-self.window_size - 1 : -1] + if len(all_freqs) > 1 + else [] + ) + else: + all_freqs = list(self.shared_resources.detector_frequencies) + recent_freqs = ( + all_freqs[-self.window_size - 1 : -1] if len(all_freqs) > 1 else [] + ) + else: + self.frequency_buffer.append(freq) + if len(self.frequency_buffer) > self.window_size: + self.frequency_buffer.pop(0) + recent_freqs = ( + self.frequency_buffer[:-1] if len(self.frequency_buffer) > 1 else [] + ) + + if len(recent_freqs) >= 3: + freqs = np.array(recent_freqs) + self.rolling_std = np.std(freqs) + + std_factor = max(self.rolling_std, 0.01) + + self.adaptive_threshold = 2.0 * std_factor + self.adaptive_delta = 0.5 * std_factor + + if self.verbose: + self.console.print( + f"[dim magenta][Page-Hinkley] σ={self.rolling_std:.3f}, " + f"λ_t={self.adaptive_threshold:.3f} (2σ threshold), " + f"δ_t={self.adaptive_delta:.3f} (0.5σ delta)[/]" + ) + + def _reset_detector_state(self): + """Reset Page-Hinkley state when no frequency is detected.""" + self.cumulative_sum_pos = 0.0 + self.cumulative_sum_neg = 0.0 + self.reference_mean = 0.0 + self.sum_of_samples = 0.0 + self.sample_count = 0 + + self.frequency_buffer.clear() + self.rolling_std = 0.0 + self.adaptive_threshold = 0.0 + self.adaptive_delta = 0.0 + + if self.shared_resources: + if hasattr(self.shared_resources, "detector_lock"): + with self.shared_resources.detector_lock: + if hasattr(self.shared_resources, "detector_frequencies"): + del self.shared_resources.detector_frequencies[:] + if hasattr(self.shared_resources, "detector_timestamps"): + del self.shared_resources.detector_timestamps[:] + if hasattr(self.shared_resources, "detector_state"): + self.shared_resources.detector_state.clear() + else: + if hasattr(self.shared_resources, "detector_frequencies"): + del self.shared_resources.detector_frequencies[:] + if hasattr(self.shared_resources, "detector_timestamps"): + del self.shared_resources.detector_timestamps[:] + if hasattr(self.shared_resources, "detector_state"): + self.shared_resources.detector_state.clear() + + self.console.print( + "[dim yellow][STPH] State cleared: Starting fresh when frequency resumes[/]" + ) + + def _initialize_fresh_state(self): + """Initialize fresh Page-Hinkley state.""" + self.cumulative_sum_pos = 0.0 + self.cumulative_sum_neg = 0.0 + self.reference_mean = 0.0 + self.sum_of_samples = 0.0 + self.sample_count = 0 + + def reset(self, current_freq: float = None): + + self.cumulative_sum_pos = 0.0 + self.cumulative_sum_neg = 0.0 + + if current_freq is not None: + self.reference_mean = current_freq + self.sum_of_samples = current_freq + self.sample_count = 1 + else: + self.reference_mean = 0.0 + self.sum_of_samples = 0.0 + self.sample_count = 0 + + if self.shared_resources: + if hasattr(self.shared_resources, "detector_lock"): + with self.shared_resources.detector_lock: + if hasattr(self.shared_resources, "detector_state"): + self.shared_resources.detector_state.update( + { + "cumulative_sum_pos": 0.0, + "cumulative_sum_neg": 0.0, + "reference_mean": self.reference_mean, + "sum_of_samples": self.sum_of_samples, + "sample_count": self.sample_count, + "initialized": True, + } + ) + + if hasattr(self.shared_resources, "detector_frequencies"): + if current_freq is not None: + self.shared_resources.detector_frequencies[:] = [current_freq] + else: + del self.shared_resources.detector_frequencies[:] + if hasattr(self.shared_resources, "detector_timestamps"): + if current_freq is not None: + last_timestamp = ( + self.shared_resources.detector_timestamps[-1] + if len(self.shared_resources.detector_timestamps) > 0 + else 0.0 + ) + self.shared_resources.detector_timestamps[:] = [ + last_timestamp + ] + else: + del self.shared_resources.detector_timestamps[:] + else: + if hasattr(self.shared_resources, "detector_state"): + self.shared_resources.detector_state.update( + { + "cumulative_sum_pos": 0.0, + "cumulative_sum_neg": 0.0, + "reference_mean": self.reference_mean, + "sum_of_samples": self.sum_of_samples, + "sample_count": self.sample_count, + "initialized": True, + } + ) + if hasattr(self.shared_resources, "detector_frequencies"): + if current_freq is not None: + self.shared_resources.detector_frequencies[:] = [current_freq] + else: + del self.shared_resources.detector_frequencies[:] + if hasattr(self.shared_resources, "detector_timestamps"): + if current_freq is not None: + last_timestamp = ( + self.shared_resources.detector_timestamps[-1] + if len(self.shared_resources.detector_timestamps) > 0 + else 0.0 + ) + self.shared_resources.detector_timestamps[:] = [last_timestamp] + else: + del self.shared_resources.detector_timestamps[:] + + if current_freq is not None: + self.console.print( + f"[cyan][PH] Internal state reset with new reference: {current_freq:.3f} Hz[/]" + ) + else: + self.console.print( + "[cyan][PH] Internal state reset: Page-Hinkley parameters reinitialized[/]" + ) + + def add_frequency( + self, freq: float, timestamp: float = None + ) -> tuple[bool, float, dict[str, Any]]: + + if np.isnan(freq) or freq <= 0: + self.console.print( + "[yellow][STPH] No frequency found - resetting Page-Hinkley state[/]" + ) + self._reset_detector_state() + return False, 0.0, {} + + self._update_adaptive_parameters(freq) + + if self.shared_resources: + if hasattr(self.shared_resources, "detector_lock"): + with self.shared_resources.detector_lock: + self.shared_resources.detector_frequencies.append(freq) + self.shared_resources.detector_timestamps.append(timestamp or 0.0) + else: + self.shared_resources.detector_frequencies.append(freq) + self.shared_resources.detector_timestamps.append(timestamp or 0.0) + + if self.sample_count == 0: + self.sample_count = 1 + self.reference_mean = freq + self.sum_of_samples = freq + if self.show_init: + self.console.print( + f"[yellow][STPH] Reference mean initialized: {self.reference_mean:.3f} Hz[/]" + ) + else: + self.sample_count += 1 + self.sum_of_samples += freq + self.reference_mean = self.sum_of_samples / self.sample_count + + pos_difference = freq - self.reference_mean - self.adaptive_delta + old_cumsum_pos = self.cumulative_sum_pos + self.cumulative_sum_pos = max(0, self.cumulative_sum_pos + pos_difference) + + neg_difference = self.reference_mean - freq - self.adaptive_delta + old_cumsum_neg = self.cumulative_sum_neg + self.cumulative_sum_neg = max(0, self.cumulative_sum_neg + neg_difference) + + if self.verbose: + self.console.print( + f"[dim magenta][STPH DEBUG] Sample #{self.sample_count}:[/]" + ) + self.console.print(f" [dim]• Current freq: {freq:.3f} Hz[/]") + self.console.print( + f" [dim]• Reference mean: {self.reference_mean:.3f} Hz[/]" + ) + self.console.print(f" [dim]• Adaptive delta: {self.adaptive_delta:.3f}[/]") + self.console.print( + f" [dim]• Positive difference: {freq:.3f} - {self.reference_mean:.3f} - {self.adaptive_delta:.3f} = {pos_difference:.3f}[/]" + ) + self.console.print( + f" [dim]• Sum_pos = max(0, {old_cumsum_pos:.3f} + {pos_difference:.3f}) = {self.cumulative_sum_pos:.3f}[/]" + ) + self.console.print( + f" [dim]• Negative difference: {self.reference_mean:.3f} - {freq:.3f} - {self.adaptive_delta:.3f} = {neg_difference:.3f}[/]" + ) + self.console.print( + f" [dim]• Sum_neg = max(0, {old_cumsum_neg:.3f} + {neg_difference:.3f}) = {self.cumulative_sum_neg:.3f}[/]" + ) + self.console.print( + f" [dim]• Adaptive threshold: {self.adaptive_threshold:.3f}[/]" + ) + self.console.print( + f" [dim]• Upward change test: {self.cumulative_sum_pos:.3f} > {self.adaptive_threshold:.3f} = {'UPWARD CHANGE!' if self.cumulative_sum_pos > self.adaptive_threshold else 'No change'}[/]" + ) + self.console.print( + f" [dim]• Downward change test: {self.cumulative_sum_neg:.3f} > {self.adaptive_threshold:.3f} = {'DOWNWARD CHANGE!' if self.cumulative_sum_neg > self.adaptive_threshold else 'No change'}[/]" + ) + + if self.shared_resources and hasattr(self.shared_resources, "detector_state"): + if hasattr(self.shared_resources, "detector_lock"): + with self.shared_resources.detector_lock: + self.shared_resources.detector_state.update( + { + "cumulative_sum_pos": self.cumulative_sum_pos, + "cumulative_sum_neg": self.cumulative_sum_neg, + "reference_mean": self.reference_mean, + "sum_of_samples": self.sum_of_samples, + "sample_count": self.sample_count, + "initialized": True, + } + ) + else: + self.shared_resources.detector_state.update( + { + "cumulative_sum_pos": self.cumulative_sum_pos, + "cumulative_sum_neg": self.cumulative_sum_neg, + "reference_mean": self.reference_mean, + "sum_of_samples": self.sum_of_samples, + "sample_count": self.sample_count, + "initialized": True, + } + ) + + if self.shared_resources and hasattr( + self.shared_resources, "detector_frequencies" + ): + sample_count = len(self.shared_resources.detector_frequencies) + else: + sample_count = len(self.frequency_buffer) + + if sample_count < 3 or self.adaptive_threshold <= 0: + return False, 0.0, {} + + upward_change = self.cumulative_sum_pos > self.adaptive_threshold + downward_change = self.cumulative_sum_neg > self.adaptive_threshold + change_detected = upward_change or downward_change + + if upward_change: + change_type = "increase" + triggering_sum = self.cumulative_sum_pos + elif downward_change: + change_type = "decrease" + triggering_sum = self.cumulative_sum_neg + else: + change_type = "none" + triggering_sum = max(self.cumulative_sum_pos, self.cumulative_sum_neg) + + if change_detected: + magnitude = abs(freq - self.reference_mean) + percent_change = ( + (magnitude / self.reference_mean * 100) if self.reference_mean > 0 else 0 + ) + + self.console.print( + f"[bold magenta][STPH] CHANGE DETECTED! " + f"{self.reference_mean:.3f}Hz → {freq:.3f}Hz " + f"({percent_change:.1f}% {change_type})[/]" + ) + self.console.print( + f"[magenta][STPH] Sum_pos={self.cumulative_sum_pos:.2f}, Sum_neg={self.cumulative_sum_neg:.2f}, " + f"Adaptive_Threshold={self.adaptive_threshold:.3f} (σ={self.rolling_std:.3f})[/]" + ) + self.console.print( + f"[dim magenta]STPH ANALYSIS: Cumulative sum exceeded adaptive threshold {self.adaptive_threshold:.2f}[/]" + ) + self.console.print( + f"[dim magenta]Detection method: {'Positive sum (upward trend)' if upward_change else 'Negative sum (downward trend)'}[/]" + ) + self.console.print( + f"[dim magenta]Adaptive minimum detectable change: {self.adaptive_delta:.3f}[/]" + ) + + if self.shared_resources and hasattr( + self.shared_resources, "detector_change_count" + ): + if hasattr(self.shared_resources, "detector_lock"): + with self.shared_resources.detector_lock: + self.shared_resources.detector_change_count.value += 1 + else: + self.shared_resources.detector_change_count.value += 1 + + current_window_size = ( + len(self.shared_resources.detector_frequencies) + if self.shared_resources + else self.sample_count + ) + + metadata = { + "cumulative_sum_pos": self.cumulative_sum_pos, + "cumulative_sum_neg": self.cumulative_sum_neg, + "triggering_sum": triggering_sum, + "change_type": change_type, + "reference_mean": self.reference_mean, + "frequency": freq, + "window_size": current_window_size, + "threshold": self.adaptive_threshold, + "adaptive_delta": self.adaptive_delta, + "rolling_std": self.rolling_std, + } + + return change_detected, triggering_sum, metadata + + +def detect_pattern_change_pagehinkley( + shared_resources, + current_prediction: Prediction, + detector: SelfTuningPageHinkleyDetector, + counter: int, +) -> tuple[bool, str | None, float, float | None, float | None]: + + import numpy as np + + current_freq = get_dominant(current_prediction) + current_time = current_prediction.t_end + + if current_freq is None or np.isnan(current_freq): + detector._reset_detector_state() + return False, None, current_prediction.t_start, None, None + + change_detected, triggering_sum, metadata = detector.add_frequency( + current_freq, current_time + ) + + if change_detected: + detector.reset(current_freq=current_freq) + + change_type = metadata.get("change_type", "unknown") + frequency = metadata.get("frequency", current_freq) + reference_mean = metadata.get("reference_mean", 0.0) + window_size = metadata.get("window_size", 0) + + magnitude = abs(frequency - reference_mean) + percent_change = (magnitude / reference_mean * 100) if reference_mean > 0 else 0 + + direction_arrow = ( + "increasing" + if change_type == "increase" + else "decreasing" if change_type == "decrease" else "stable" + ) + log_message = ( + f"[bold red][Page-Hinkley] PAGE-HINKLEY CHANGE DETECTED! {direction_arrow} " + f"{reference_mean:.1f}Hz → {frequency:.1f}Hz " + f"(Δ={magnitude:.1f}Hz, {percent_change:.1f}% {change_type}) " + f"at sample {window_size}, time={current_time:.3f}s[/]\n" + f"[red][Page-Hinkley] Page-Hinkley stats: sum_pos={metadata.get('cumulative_sum_pos', 0):.2f}, " + f"sum_neg={metadata.get('cumulative_sum_neg', 0):.2f}, threshold={detector.adaptive_threshold:.3f}[/]\n" + f"[red][Page-Hinkley] Cumulative sum exceeded threshold -> Starting fresh analysis[/]" + ) + + adaptive_start_time = current_time + if hasattr(shared_resources, "detector_last_change_time"): + shared_resources.detector_last_change_time.value = current_time + + logger = shared_resources.logger if hasattr(shared_resources, "logger") else None + if logger: + logger.send_log( + "change_point", + "Page-Hinkley Change Point Detected", + { + "algorithm": "PageHinkley", + "frequency": frequency, + "reference_mean": reference_mean, + "magnitude": magnitude, + "percent_change": percent_change, + "triggering_sum": triggering_sum, + "change_type": change_type, + "position": window_size, + "timestamp": current_time, + "threshold": detector.adaptive_threshold, + "delta": detector.adaptive_delta, + "prediction_counter": counter, + }, + ) + + return True, log_message, adaptive_start_time, reference_mean, frequency + + return False, None, current_prediction.t_start, None, None diff --git a/ftio/prediction/online_analysis.py b/ftio/prediction/online_analysis.py index d48e200..f37f928 100644 --- a/ftio/prediction/online_analysis.py +++ b/ftio/prediction/online_analysis.py @@ -2,6 +2,9 @@ from __future__ import annotations +import json +import socket +import time from argparse import Namespace import numpy as np @@ -10,55 +13,286 @@ from ftio.cli import ftio_core from ftio.freq.prediction import Prediction from ftio.plot.units import set_unit +from ftio.prediction.change_point_detection import ( + ChangePointDetector, + CUSUMDetector, + SelfTuningPageHinkleyDetector, + detect_pattern_change_adwin, + detect_pattern_change_cusum, + detect_pattern_change_pagehinkley, +) from ftio.prediction.helper import get_dominant from ftio.prediction.shared_resources import SharedResources +class SocketLogger: + """TCP socket client for sending prediction and change point data to the GUI dashboard. + + Establishes a connection to the dashboard server and sends JSON-formatted messages + containing prediction results, change point detections, and other log data for + real-time visualization. + + Attributes: + host: The hostname of the GUI server (default: "localhost"). + port: The port number of the GUI server (default: 9999). + socket: The TCP socket connection. + connected: Boolean indicating if currently connected to the server. + """ + + def __init__(self, host: str = "localhost", port: int = 9999): + """Initialize the socket logger and attempt connection to the GUI server. + + Args: + host: The hostname of the GUI server. + port: The port number of the GUI server. + """ + self.host = host + self.port = port + self.socket = None + self.connected = False + self._connect() + + def _connect(self): + """Attempt to establish a TCP connection to the GUI dashboard server. + + Creates a socket with a 1-second timeout and attempts to connect to the + SocketListener running in the GUI dashboard process. If connection fails + (e.g., GUI not running), sets connected=False and continues without GUI + logging - predictions still work, just without real-time visualization. + + The connection is optional: the predictor works fine without the GUI. + """ + try: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.settimeout(1.0) # 1 second timeout + self.socket.connect((self.host, self.port)) + self.connected = True + print(f"[INFO] Connected to GUI server at {self.host}:{self.port}") + except (TimeoutError, OSError, ConnectionRefusedError) as e: + self.connected = False + if self.socket: + self.socket.close() + self.socket = None + print( + f"[WARNING] Failed to connect to GUI server at {self.host}:{self.port}: {e}" + ) + print("[WARNING] GUI logging disabled - messages will only appear in console") + + def send_log(self, log_type: str, message: str, data: dict = None): + """Send a log message to the GUI dashboard for visualization. + + Constructs a JSON message with timestamp, type, message, and optional data, + then sends it over the TCP socket. If sending fails, marks the connection + as closed and stops further send attempts. + + Args: + log_type: Category of the message. Common types: + - "prediction": New FTIO prediction result + - "change_point": Change point detection event + - "info": General information message + message: Human-readable description of the event. + data: Dictionary containing structured data for the GUI to display. + For predictions, includes frequency, confidence, time window, etc. + For change points, includes old/new frequency and detection time. + """ + if not self.connected: + return + + try: + log_data = { + "timestamp": time.time(), + "type": log_type, + "message": message, + "data": data or {}, + } + + json_data = json.dumps(log_data) + "\n" + self.socket.send(json_data.encode("utf-8")) + + except (OSError, BrokenPipeError, ConnectionResetError) as e: + print(f"[WARNING] Failed to send to GUI: {e}") + self.connected = False + if self.socket: + self.socket.close() + self.socket = None + + def close(self): + """Close the socket connection to the GUI server. + + Safe to call multiple times. After closing, no more messages can be sent. + """ + if self.socket: + self.socket.close() + self.socket = None + self.connected = False + + +_socket_logger = None +_gui_enabled = False + + +def init_socket_logger(gui_enabled: bool = False): + """Initialize the socket logger based on --gui flag""" + global _socket_logger, _gui_enabled + _gui_enabled = gui_enabled + if gui_enabled: + _socket_logger = SocketLogger() + else: + _socket_logger = None + + +def get_socket_logger(): + global _socket_logger, _gui_enabled + if not _gui_enabled: + return None + if _socket_logger is None: + _socket_logger = SocketLogger() + return _socket_logger + + +def strip_rich_formatting(text: str) -> str: + import re + + clean_text = re.sub(r"\[/?(?:purple|blue|green|yellow|red|bold|dim|/)\]", "", text) + + clean_text = re.sub(r"\[(?:purple|blue|green|yellow|red|bold|dim)\[", "[", clean_text) + + return clean_text + + +def log_to_gui_and_console( + console: Console, message: str, log_type: str = "info", data: dict = None +): + logger = get_socket_logger() + clean_message = strip_rich_formatting(message) + + console.print(message) + + if logger is not None: + logger.send_log(log_type, clean_message, data) + + +def get_change_detector(shared_resources: SharedResources, algorithm: str = "adwin"): + algo = (algorithm or "adwin").lower() + detector_key = f"{algo}_detector" + + # Cache detector on shared_resources instead of using global + if not hasattr(shared_resources, "_detector_cache"): + shared_resources._detector_cache = {} + + if detector_key in shared_resources._detector_cache: + return shared_resources._detector_cache[detector_key] + + show_init_message = not shared_resources.detector_initialized.value + + if algo == "cusum": + detector = CUSUMDetector( + window_size=50, + shared_resources=shared_resources, + show_init=show_init_message, + verbose=True, + ) + elif algo == "ph": + detector = SelfTuningPageHinkleyDetector( + shared_resources=shared_resources, show_init=show_init_message, verbose=True + ) + else: + detector = ChangePointDetector( + delta=0.05, + shared_resources=shared_resources, + show_init=show_init_message, + verbose=True, + ) + + shared_resources._detector_cache[detector_key] = detector + shared_resources.detector_initialized.value = True + return detector + + def ftio_process(shared_resources: SharedResources, args: list[str], msgs=None) -> None: """Perform a single prediction Args: shared_resources (SharedResources): shared resources among processes args (list[str]): additional arguments passed to ftio + msgs: ZMQ messages (optional) """ console = Console() - console.print(f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] Started") + pred_id = shared_resources.count.value + start_msg = f"[purple][PREDICTOR] (#{pred_id}):[/] Started" + log_to_gui_and_console(console, start_msg, "predictor_start", {"count": pred_id}) # Modify the arguments args.extend(["-e", "no"]) args.extend(["-ts", f"{shared_resources.start_time.value:.2f}"]) # perform prediction - prediction, parsed_args = ftio_core.main(args, msgs) - if not prediction: - console.print("[yellow]Terminating prediction (no data passed) [/]") - console.print( - f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] Stopped" - ) - exit(0) - - if not isinstance(prediction, list) or len(prediction) != 1: - raise ValueError( - "[red][PREDICTOR] (#{shared_resources.count.value}):[/] predictor should be called on exactly on file" + prediction_list, parsed_args = ftio_core.main(args, msgs) + if not prediction_list: + log_to_gui_and_console( + console, + "[yellow]Terminating prediction (no data passed)[/]", + "termination", + {"reason": "no_data"}, ) + return # get the prediction - prediction = prediction[-1] + prediction = prediction_list[-1] # plot_bar_with_rich(shared_resources.t_app,shared_resources.b_app, width_percentage=0.9) # get data - freq = get_dominant(prediction) # just get a single dominant value + freq = get_dominant(prediction) or 0.0 # just get a single dominant value # save prediction results save_data(prediction, shared_resources) # display results text = display_result(freq, prediction, shared_resources) - # data analysis to decrease window thus change start_time - text += window_adaptation(parsed_args, prediction, freq, shared_resources) - + # Get change point info directly from window_adaptation + adaptation_text, is_change_point, change_point_info = window_adaptation( + parsed_args, prediction, freq, shared_resources + ) + text += adaptation_text + candidates = [ + {"frequency": f, "confidence": c} + for f, c in zip(prediction.dominant_freq, prediction.conf, strict=True) + ] + if candidates: + best = max(candidates, key=lambda c: c["confidence"]) + dominant_freq = best["frequency"] + dominant_period = 1.0 / dominant_freq if dominant_freq > 0 else 0.0 + confidence = best["confidence"] + else: + dominant_freq = dominant_period = confidence = 0.0 + + structured_prediction = { + "prediction_id": pred_id, + "timestamp": str(time.time()), + "dominant_freq": dominant_freq, + "dominant_period": dominant_period, + "confidence": confidence, + "candidates": candidates, + "time_window": (float(prediction.t_start), float(prediction.t_end)), + "total_bytes": str(prediction.total_bytes), + "bytes_transferred": str(prediction.total_bytes), + "current_hits": int(shared_resources.hits.value), + "periodic_probability": 0.0, + "frequency_range": (0.0, 0.0), + "period_range": (0.0, 0.0), + "is_change_point": is_change_point, + "change_point": change_point_info, + } + + logger = get_socket_logger() + if logger is not None: + logger.send_log("prediction", "FTIO structured prediction", structured_prediction) # print text - console.print(text) + log_to_gui_and_console( + console, text, "prediction_log", {"count": pred_id, "freq": dominant_freq} + ) + + shared_resources.count.value += 1 def window_adaptation( @@ -66,18 +300,17 @@ def window_adaptation( prediction: Prediction, freq: float, shared_resources: SharedResources, -) -> str: - """modifies the start time if conditions are true +) -> tuple[str, bool, dict]: + """Modifies the start time if conditions are true. Also performs change point detection. Args: args (argparse): command line arguments prediction (Prediction): result from FTIO freq (float|Nan): dominant frequency shared_resources (SharedResources): shared resources among processes - text (str): text to display Returns: - str: _description_ + tuple: (text, is_change_point, change_point_info) """ # average data/data processing text = "" @@ -85,15 +318,113 @@ def window_adaptation( t_e = prediction.t_end total_bytes = prediction.total_bytes + prediction_count = shared_resources.count.value + text += f"Prediction #{prediction_count}\n" + # Hits text += hits(args, prediction, shared_resources) + algorithm = args.online_adaptation + + # Change point detection - capture data directly + detector = get_change_detector(shared_resources, algorithm) + if algorithm == "cusum": + change_detected, change_log, adaptive_start_time, old_freq, new_freq = ( + detect_pattern_change_cusum( + shared_resources, prediction, detector, shared_resources.count.value + ) + ) + elif algorithm == "ph": + change_detected, change_log, adaptive_start_time, old_freq, new_freq = ( + detect_pattern_change_pagehinkley( + shared_resources, prediction, detector, shared_resources.count.value + ) + ) + else: + change_detected, change_log, adaptive_start_time, old_freq, new_freq = ( + detect_pattern_change_adwin( + shared_resources, prediction, detector, shared_resources.count.value + ) + ) + + # Build change point info directly - no regex needed + change_point_info = None + if change_detected: + old_freq_val = ( + float(old_freq) if old_freq is not None and not np.isnan(old_freq) else 0.0 + ) + new_freq_val = ( + float(new_freq) if new_freq is not None and not np.isnan(new_freq) else 0.0 + ) + freq_change_pct = ( + abs(new_freq_val - old_freq_val) / old_freq_val * 100 + if old_freq_val > 0 + else 0.0 + ) + sample_count = len(shared_resources.detector_frequencies) + change_point_info = { + "prediction_id": shared_resources.count.value, + "timestamp": float(prediction.t_end), + "old_frequency": old_freq_val, + "new_frequency": new_freq_val, + "frequency_change_percent": freq_change_pct, + "sample_number": sample_count, + "cut_position": sample_count - 1 if sample_count > 0 else 0, + "total_samples": sample_count, + "start_time": float(adaptive_start_time), + } + + if np.isnan(freq): + detector_samples = len(shared_resources.detector_frequencies) + detector_changes = shared_resources.detector_change_count.value + text += f"[dim][{algorithm.upper()} STATE: {detector_samples} samples, {detector_changes} changes detected so far][/]\n" + if detector_samples > 0: + last_freq = ( + shared_resources.detector_frequencies[-1] + if shared_resources.detector_frequencies + else "None" + ) + text += f"[dim][LAST KNOWN FREQ: {last_freq:.3f} Hz][/]\n" + + if change_detected and change_log: + text += f"{change_log}\n" + min_window_size = 1.0 + safe_adaptive_start = min(adaptive_start_time, t_e - min_window_size) + + if safe_adaptive_start >= 0 and (t_e - safe_adaptive_start) >= min_window_size: + t_s = safe_adaptive_start + algorithm_name = ( + args.online_adaptation.upper() + if hasattr(args, "online_adaptation") + else "UNKNOWN" + ) + text += f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/][green] {algorithm_name} adapted window to start at {t_s:.3f}s (window size: {t_e - t_s:.3f}s)[/]\n" + else: + t_s = max(0, t_e - min_window_size) + algorithm_name = ( + args.online_adaptation.upper() + if hasattr(args, "online_adaptation") + else "UNKNOWN" + ) + text += f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/][yellow] {algorithm_name} adaptation would create unsafe window, using conservative {min_window_size}s window[/]\n" + # time window adaptation - if not np.isnan(freq): - n_phases = (t_e - t_s) * freq - avr_bytes = int(total_bytes / float(n_phases)) - unit, order = set_unit(avr_bytes, "B") - avr_bytes = order * avr_bytes + if not np.isnan(freq) and freq > 0: + time_window = t_e - t_s + if time_window > 0: + n_phases = time_window * freq + if n_phases > 0: + avr_bytes = int(total_bytes / float(n_phases)) + unit, order = set_unit(avr_bytes, "B") + avr_bytes = order * avr_bytes + else: + n_phases = 0 + avr_bytes = 0 + unit = "B" + else: + n_phases = 0 + avr_bytes = 0 + unit = "B" # FIXME this needs to compensate for a smaller windows if not args.window_adaptation: @@ -103,37 +434,74 @@ def window_adaptation( ) # adaptive time window - if "frequency_hits" in args.window_adaptation: + if "frequency_hits" in args.window_adaptation and not change_detected: if shared_resources.hits.value > args.hits: - if ( - True - ): # np.abs(avr_bytes - (total_bytes-aggregated_bytes.value)) < 100: + if True: tmp = t_e - 3 * 1 / freq t_s = tmp if tmp > 0 else 0 text += f"[bold purple][PREDICTOR] (#{shared_resources.count.value}):[/][green]Adjusting start time to {t_s} sec\n[/]" else: - t_s = 0 - if shared_resources.hits.value == 0: - text += f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/][red bold] Resetting start time to {t_s} sec\n[/]" - elif "data" in args.window_adaptation and len(shared_resources.data) > 0: - text += f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/][green]Trying time window adaptation: {shared_resources.count.value:.0f} =? { args.hits * shared_resources.hits.value:.0f}\n[/]" + if not change_detected: + t_s = 0 + if shared_resources.hits.value == 0: + text += f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/][red bold] Resetting start time to {t_s} sec\n[/]" + elif ( + "data" in args.window_adaptation + and len(shared_resources.data) > 0 + and not change_detected + ): + text += f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/][green]Trying time window adaptation: {shared_resources.count.value:.0f} =? {args.hits * shared_resources.hits.value:.0f}\n[/]" if shared_resources.count.value == args.hits * shared_resources.hits.value: # t_s = shared_resources.data[-shared_resources.count.value]['t_start'] - # text += f'[bold purple][PREDICTOR] (#{shared_resources.count.value}):[/][green]Adjusting start time to t_start {t_s} sec\n[/]' + # text += f'[bold purple][PREDICTOR] (#{shared_resources.count.value}):[/][green] Adjusting start time to t_start {t_s} sec\n[/]' if len(shared_resources.t_flush) > 0: print(shared_resources.t_flush) index = int(args.hits * shared_resources.hits.value - 1) t_s = shared_resources.t_flush[index] text += f"[bold purple][PREDICTOR] (#{shared_resources.count.value}):[/][green]Adjusting start time to t_flush[{index}] {t_s} sec\n[/]" + if not np.isnan(freq): + samples = len(shared_resources.detector_frequencies) + changes = shared_resources.detector_change_count.value + recent_freqs = ( + list(shared_resources.detector_frequencies)[-5:] + if len(shared_resources.detector_frequencies) >= 5 + else list(shared_resources.detector_frequencies) + ) + + success_rate = (samples / prediction_count) * 100 if prediction_count > 0 else 0 + + text += f"\n[bold cyan]{algorithm.upper()} ANALYSIS (Prediction #{prediction_count})[/]\n" + text += f"[cyan]Frequency detections: {samples}/{prediction_count} ({success_rate:.1f}% success)[/]\n" + text += f"[cyan]Pattern changes detected: {changes}[/]\n" + text += f"[cyan]Current frequency: {freq:.3f} Hz ({1 / freq:.2f}s period)[/]\n" + + if samples > 1: + text += ( + f"[cyan]Recent freq history: {[f'{f:.3f}Hz' for f in recent_freqs]}[/]\n" + ) + + if len(recent_freqs) >= 2: + trend = ( + "increasing" + if recent_freqs[-1] > recent_freqs[-2] + else "decreasing" if recent_freqs[-1] < recent_freqs[-2] else "stable" + ) + text += f"[cyan]Frequency trend: {trend}[/]\n" + + text += f"[cyan]{algorithm.upper()} window size: {samples} samples[/]\n" + text += f"[cyan]{algorithm.upper()} changes detected: {changes}[/]\n" + + text += f"[bold cyan]{'=' * 50}[/]\n\n" + # TODO 1: Make sanity check -- see if the same number of bytes was transferred # TODO 2: Train a model to validate the predictions? text += f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] Ended" shared_resources.start_time.value = t_s - return text + return text, change_detected, change_point_info -def save_data(prediction, shared_resources) -> None: +def save_data(prediction: Prediction, shared_resources) -> None: """Put all data from `prediction` in a `queue`. The total bytes are as well saved here. Args: @@ -177,20 +545,23 @@ def display_result( text = "" # Dominant frequency if not np.isnan(freq): - text = f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] Dominant freq {freq:.3f} Hz ({1/freq if freq != 0 else 0:.2f} sec)\n" + text = f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] Dominant freq {freq:.3f} Hz ({1 / freq if freq != 0 else 0:.2f} sec)\n" + else: + text = f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] No dominant frequency found\n" # Candidates - text += ( - f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] Freq candidates: \n" - ) - for i, f_d in enumerate(prediction.dominant_freq): - text += ( - f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] {i}) " - f"{f_d:.2f} Hz -- conf {prediction.conf[i]:.2f}\n" - ) + if len(prediction.dominant_freq) > 0: + text += f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] Freq candidates ({len(prediction.dominant_freq)} found): \n" + for i, f_d in enumerate(prediction.dominant_freq): + text += ( + f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] {i}) " + f"{f_d:.2f} Hz -- conf {prediction.conf[i]:.2f}\n" + ) + else: + text += f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] No frequency candidates detected\n" # time window - text += f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] Time window {prediction.t_end-prediction.t_start:.3f} sec ([{prediction.t_start:.3f},{prediction.t_end:.3f}] sec)\n" + text += f"[purple][PREDICTOR] (#{shared_resources.count.value}):[/] Time window {prediction.t_end - prediction.t_start:.3f} sec ([{prediction.t_start:.3f},{prediction.t_end:.3f}] sec)\n" # total bytes total_bytes = shared_resources.aggregated_bytes.value diff --git a/ftio/prediction/probability_analysis.py b/ftio/prediction/probability_analysis.py index d7498f0..15bf3bb 100644 --- a/ftio/prediction/probability_analysis.py +++ b/ftio/prediction/probability_analysis.py @@ -73,3 +73,59 @@ def find_probability(data: list[dict], method: str = "db", counter: int = -1) -> out.append(prob) return out + + +def detect_pattern_change(shared_resources, prediction, detector, count): + """Detect pattern changes in predictions. + + Args: + shared_resources: Shared resources for multiprocessing + prediction: Current prediction object + detector: Change point detector instance + count: Prediction counter + + Returns: + tuple: (change_detected, change_log, start_time) + """ + if prediction is None or detector is None: + return False, "", 0.0 + + freq = get_dominant(prediction) + + if freq is None or np.isnan(freq): + return False, "", getattr(prediction, "t_start", 0.0) + + current_time = getattr(prediction, "t_end", 0.0) + + if hasattr(detector, "verbose") and detector.verbose: + console = Console() + console.print( + f"[cyan][DEBUG] Change point detection called for prediction #{count}, freq={freq:.3f} Hz[/]" + ) + frequencies = getattr(detector, "frequencies", []) + is_calibrated = getattr(detector, "is_calibrated", False) + console.print( + f"[cyan][DEBUG] Detector calibrated: {is_calibrated}, samples: {len(frequencies)}[/]" + ) + + result = detector.add_prediction(prediction, current_time) + + if hasattr(detector, "verbose") and detector.verbose: + console = Console() + console.print(f"[cyan][DEBUG] Detector result: {result}[/]") + + if result is not None: + change_point_idx, change_point_time = result + + if hasattr(detector, "verbose") and detector.verbose: + console = Console() + console.print( + f"[green][DEBUG] CHANGE POINT DETECTED! Index: {change_point_idx}, Time: {change_point_time:.3f}[/]" + ) + + change_log = f"[red bold][CHANGE_POINT] t_s={change_point_time:.3f} sec[/]" + change_log += f"\n[purple][PREDICTOR] (#{count}):[/][yellow] Adapting analysis window to start at t_s={change_point_time:.3f}[/]" + + return True, change_log, change_point_time + + return False, "", getattr(prediction, "t_start", 0.0) diff --git a/ftio/prediction/shared_resources.py b/ftio/prediction/shared_resources.py index 45b21f9..71791d2 100644 --- a/ftio/prediction/shared_resources.py +++ b/ftio/prediction/shared_resources.py @@ -26,9 +26,20 @@ def _init_shared_resources(self): self.t_app = self.manager.list() # For triggering cargo self.sync_trigger = self.manager.Queue() - # saves when the dada ti received from gkfs + # saves when the data is received from gkfs self.t_flush = self.manager.list() + # Change point detection shared state (used by all algorithms: ADWIN, CUSUM, Page-Hinkley) + self.detector_frequencies = self.manager.list() + self.detector_timestamps = self.manager.list() + self.detector_total_samples = self.manager.Value("i", 0) + self.detector_change_count = self.manager.Value("i", 0) + self.detector_last_change_time = self.manager.Value("d", 0.0) + self.detector_initialized = self.manager.Value("b", False) + self.detector_lock = self.manager.Lock() + # Algorithm-specific state (e.g., Page-Hinkley cumulative sums) + self.detector_state = self.manager.dict() + def restart(self): """Restart the manager and reinitialize shared resources.""" print("Shutting down existing Manager...") diff --git a/pyproject.toml b/pyproject.toml index 75ef320..df3a5fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,6 +81,9 @@ jit = "ftio.api.gekkoFs.jit.jit:main" parallel_trace_analysis = "ftio.api.trace_analysis.parallel_trace_analysis:main" # analyses several traces (JSON or CSV) in parallel proxy_ftio = "ftio.api.metric_proxy.parallel_proxy:main" +# GUI +ftio_gui = "ftio.gui.dashboard:main" + # Debug-Specific Scripts plot_bandwdith = "ftio.plot.plot_bandwidth:main" convert_trace = "ftio.util.convert_old_trace:main" @@ -98,6 +101,10 @@ external-libs = [ "colorlog", ] +gui = [ + "dash", +] + development-libs = [ "black", "isort", @@ -120,9 +127,6 @@ plot-libs = [ "trace_updater", ] -[project.gui-scripts] - - [tool.setuptools.dynamic] version = { attr = "ftio.__version__" } readme = { file = ["README.md"] } diff --git a/test/test_change_point_detection.py b/test/test_change_point_detection.py new file mode 100644 index 0000000..29d19c0 --- /dev/null +++ b/test/test_change_point_detection.py @@ -0,0 +1,289 @@ +""" +Tests for change point detection algorithms (ADWIN, CUSUM, Page-Hinkley). + +Author: Amine Aherbil +Copyright (c) 2025 TU Darmstadt, Germany +Date: January 2025 + +Licensed under the BSD 3-Clause License. +For more information, see the LICENSE file in the project root: +https://github.com/tuda-parallel/FTIO/blob/main/LICENSE +""" + +from unittest.mock import MagicMock + +import numpy as np + +from ftio.freq.prediction import Prediction +from ftio.prediction.change_point_detection import ( + ChangePointDetector, + CUSUMDetector, + SelfTuningPageHinkleyDetector, +) + + +def create_mock_prediction(freq: float, t_start: float, t_end: float) -> MagicMock: + """Create a mock Prediction object with specified frequency.""" + pred = MagicMock(spec=Prediction) + pred.dominant_freq = np.array([freq]) + pred.t_start = t_start + pred.t_end = t_end + # Mock get_dominant_freq() to return scalar (used by get_dominant() helper) + pred.get_dominant_freq.return_value = freq + return pred + + +class TestADWINDetector: + """Test cases for ADWIN change point detector.""" + + def test_initialization(self): + """Test ADWIN detector initializes correctly.""" + detector = ChangePointDetector(delta=0.05, shared_resources=None, show_init=False) + assert detector.delta == 0.05 + assert detector.min_window_size == 2 + + def test_no_change_stable_frequency(self): + """Test that stable frequencies don't trigger change detection.""" + detector = ChangePointDetector(delta=0.05, shared_resources=None, show_init=False) + + # Add stable frequency predictions + for i in range(10): + pred = create_mock_prediction(freq=0.5, t_start=i, t_end=i + 1) + _ = detector.add_prediction(pred, timestamp=float(i + 1)) + + # Should not detect change with stable frequency + assert detector._get_change_count() == 0 + + def test_detects_frequency_change(self): + """Test that significant frequency change is detected.""" + detector = ChangePointDetector(delta=0.05, shared_resources=None, show_init=False) + + # Add low frequency predictions (more samples for statistical significance) + for i in range(10): + pred = create_mock_prediction(freq=0.1, t_start=i, t_end=i + 1) + detector.add_prediction(pred, timestamp=float(i + 1)) + + # Add high frequency predictions (significant change: 0.1 -> 10 Hz) + change_detected = False + for i in range(10, 30): + pred = create_mock_prediction(freq=10.0, t_start=i, t_end=i + 1) + result = detector.add_prediction(pred, timestamp=float(i + 1)) + if result is not None: + change_detected = True + + # Should detect the change during the loop or in the count + assert change_detected or detector._get_change_count() >= 1 + + def test_reset_on_nan_frequency(self): + """Test that NaN frequency resets the detector window.""" + detector = ChangePointDetector(delta=0.05, shared_resources=None, show_init=False) + + # Add some predictions + for i in range(5): + pred = create_mock_prediction(freq=0.5, t_start=i, t_end=i + 1) + detector.add_prediction(pred, timestamp=float(i + 1)) + + # Add NaN frequency + pred = create_mock_prediction(freq=np.nan, t_start=5, t_end=6) + detector.add_prediction(pred, timestamp=6.0) + + # Window should be reset + assert len(detector._get_frequencies()) == 0 + + def test_window_stats(self): + """Test window statistics calculation.""" + detector = ChangePointDetector(delta=0.05, shared_resources=None, show_init=False) + + # Add predictions + freqs = [0.5, 0.6, 0.4, 0.5, 0.55] + for i, f in enumerate(freqs): + pred = create_mock_prediction(freq=f, t_start=i, t_end=i + 1) + detector.add_prediction(pred, timestamp=float(i + 1)) + + stats = detector.get_window_stats() + assert stats["size"] == 5 + assert abs(stats["mean"] - np.mean(freqs)) < 0.001 + + +class TestCUSUMDetector: + """Test cases for CUSUM change point detector.""" + + def test_initialization(self): + """Test CUSUM detector initializes correctly.""" + detector = CUSUMDetector(window_size=50, shared_resources=None, show_init=False) + assert detector.window_size == 50 + assert detector.sum_pos == 0.0 + assert detector.sum_neg == 0.0 + + def test_reference_establishment(self): + """Test that reference is established from initial samples.""" + detector = CUSUMDetector(window_size=50, shared_resources=None, show_init=False) + + # Add initial samples + freqs = [0.5, 0.5, 0.5] + for f in freqs: + detector.add_frequency(f, timestamp=0.0) + + assert detector.initialized + assert abs(detector.reference - 0.5) < 0.001 + + def test_detects_upward_change(self): + """Test detection of upward frequency shift.""" + detector = CUSUMDetector(window_size=50, shared_resources=None, show_init=False) + + # Establish baseline + for i in range(5): + detector.add_frequency(0.1, timestamp=float(i)) + + # Introduce upward shift + change_detected = False + for i in range(5, 20): + detected, info = detector.add_frequency(1.0, timestamp=float(i)) + if detected: + change_detected = True + break + + assert change_detected + + def test_reset_on_nan(self): + """Test that NaN frequency resets CUSUM state.""" + detector = CUSUMDetector(window_size=50, shared_resources=None, show_init=False) + + # Add some frequencies + for i in range(5): + detector.add_frequency(0.5, timestamp=float(i)) + + # Add NaN + detector.add_frequency(np.nan, timestamp=5.0) + + assert not detector.initialized + assert detector.sum_pos == 0.0 + assert detector.sum_neg == 0.0 + + +class TestPageHinkleyDetector: + """Test cases for Page-Hinkley change point detector.""" + + def test_initialization(self): + """Test Page-Hinkley detector initializes correctly.""" + detector = SelfTuningPageHinkleyDetector( + window_size=10, shared_resources=None, show_init=False + ) + assert detector.window_size == 10 + assert detector.cumulative_sum_pos == 0.0 + assert detector.cumulative_sum_neg == 0.0 + + def test_reference_mean_update(self): + """Test that reference mean updates with new samples.""" + detector = SelfTuningPageHinkleyDetector( + window_size=10, shared_resources=None, show_init=False + ) + + # Add samples + detector.add_frequency(0.5, timestamp=0.0) + assert detector.reference_mean == 0.5 + + detector.add_frequency(1.0, timestamp=1.0) + assert abs(detector.reference_mean - 0.75) < 0.001 + + def test_detects_change(self): + """Test detection of frequency change.""" + detector = SelfTuningPageHinkleyDetector( + window_size=10, shared_resources=None, show_init=False + ) + + # Establish baseline + for i in range(5): + detector.add_frequency(0.1, timestamp=float(i)) + + # Introduce shift + change_detected = False + for i in range(5, 20): + detected, _, _ = detector.add_frequency(1.0, timestamp=float(i)) + if detected: + change_detected = True + break + + assert change_detected + + def test_reset_functionality(self): + """Test reset functionality.""" + detector = SelfTuningPageHinkleyDetector( + window_size=10, shared_resources=None, show_init=False + ) + + # Add samples and accumulate state + for i in range(5): + detector.add_frequency(0.5, timestamp=float(i)) + + # Reset with new frequency + detector.reset(current_freq=1.0) + + assert detector.cumulative_sum_pos == 0.0 + assert detector.cumulative_sum_neg == 0.0 + assert detector.reference_mean == 1.0 + + +class TestDetectorIntegration: + """Integration tests for change point detectors.""" + + def test_all_detectors_handle_empty_input(self): + """Test all detectors handle edge cases gracefully.""" + adwin = ChangePointDetector(delta=0.05, shared_resources=None, show_init=False) + cusum = CUSUMDetector(window_size=50, shared_resources=None, show_init=False) + ph = SelfTuningPageHinkleyDetector( + window_size=10, shared_resources=None, show_init=False + ) + + # Test with zero frequency + pred = create_mock_prediction(freq=0.0, t_start=0, t_end=1) + + result_adwin = adwin.add_prediction(pred, timestamp=1.0) + result_cusum = cusum.add_frequency(0.0, timestamp=1.0) + result_ph = ph.add_frequency(0.0, timestamp=1.0) + + # All should handle gracefully (not crash) + assert result_adwin is None + assert result_cusum == (False, {}) + assert result_ph == (False, 0.0, {}) + + def test_all_detectors_consistent_detection(self): + """Test all detectors can detect obvious pattern changes.""" + adwin = ChangePointDetector(delta=0.05, shared_resources=None, show_init=False) + cusum = CUSUMDetector(window_size=50, shared_resources=None, show_init=False) + ph = SelfTuningPageHinkleyDetector( + window_size=10, shared_resources=None, show_init=False + ) + + # Create obvious pattern change: 0.1 Hz -> 10 Hz + low_freq = 0.1 + high_freq = 10.0 + + # Feed low frequency + for i in range(10): + pred = create_mock_prediction(freq=low_freq, t_start=i, t_end=i + 1) + adwin.add_prediction(pred, timestamp=float(i + 1)) + cusum.add_frequency(low_freq, timestamp=float(i + 1)) + ph.add_frequency(low_freq, timestamp=float(i + 1)) + + # Feed high frequency and check for detection + adwin_detected = False + cusum_detected = False + ph_detected = False + + for i in range(10, 30): + pred = create_mock_prediction(freq=high_freq, t_start=i, t_end=i + 1) + + if adwin.add_prediction(pred, timestamp=float(i + 1)) is not None: + adwin_detected = True + + detected, _ = cusum.add_frequency(high_freq, timestamp=float(i + 1)) + if detected: + cusum_detected = True + + detected, _, _ = ph.add_frequency(high_freq, timestamp=float(i + 1)) + if detected: + ph_detected = True + + # All detectors should detect such an obvious change + assert adwin_detected or cusum_detected or ph_detected