From 066a5debb92d398db171544fd9db0d29d1096fdf Mon Sep 17 00:00:00 2001 From: ModusMorris Date: Thu, 27 Feb 2025 10:36:47 +0100 Subject: [PATCH 1/2] already predicts the gait but is not yet meaningful still makes mistakes --- cnn/data_generator.py | 112 ++++++----- cnn/model_step_counter.py | 90 +++------ cnn/prediction.py | 153 ++++++++++---- cnn/training.py | 407 ++++++++++++++++++++++---------------- 4 files changed, 442 insertions(+), 320 deletions(-) diff --git a/cnn/data_generator.py b/cnn/data_generator.py index e342036..ad2f245 100644 --- a/cnn/data_generator.py +++ b/cnn/data_generator.py @@ -5,33 +5,38 @@ from torch.utils.data import Dataset, DataLoader import ast - def compute_enmo(data): - # Calculate the ENMO value for the data - norm = np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) - 1 - return np.maximum(norm, 0) # Set negative values to 0 - + # Compute the Euclidean Norm Minus One (ENMO) for the accelerometer data + norm = np.sqrt(data["X"]**2 + data["Y"]**2 + data["Z"]**2) - 1 + return np.maximum(norm, 0) class StepCounterDataset(Dataset): - def __init__(self, left_data, right_data, step_counts, window_size): - self.window_size = window_size # Ensure window_size is assigned - - # Calculate ENMO for both feet + def __init__(self, left_data, right_data, step_counts, window_size, gait_vector): + """ + Args: + left_data, right_data: Accelerometer data (DataFrame). + step_counts: DataFrame with step peaks. + window_size: Length of the window. + gait_vector: np.array of length 6, e.g., [0,1,0,1,0,0] for gait labels. + """ + self.window_size = window_size + + # --- ENMO, differences, normalization as usual --- left_data["ENMO"] = compute_enmo(left_data) right_data["ENMO"] = compute_enmo(right_data) - # Calculate the difference in ENMO values left_data["ENMO_DIFF"] = left_data["ENMO"].diff().fillna(0) right_data["ENMO_DIFF"] = right_data["ENMO"].diff().fillna(0) - # Stack the ENMO differences for both feet - self.data = np.hstack((left_data[["ENMO_DIFF"]], right_data[["ENMO_DIFF"]])) + self.data = np.hstack(( + left_data[["ENMO_DIFF"]].values, + right_data[["ENMO_DIFF"]].values + )) - # Normalize the data self.scaler = StandardScaler() self.data = self.scaler.fit_transform(self.data) - # Extract step labels + # --- Extract step labels --- def extract_peaks(peaks_str): if isinstance(peaks_str, str): try: @@ -40,76 +45,87 @@ def extract_peaks(peaks_str): return [] return [] - # Extract peaks for left and right feet left_peaks = extract_peaks(step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values[0]) right_peaks = extract_peaks(step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values[0]) - # Create step labels + # Step label: 0 or 1 per sample self.step_labels = np.zeros(len(self.data), dtype=np.float32) - - # Shift step labels to improve peak positions for CNN for p in left_peaks + right_peaks: + # Small offset to center the peak in the window if 0 <= p < len(self.step_labels) - (window_size // 2): self.step_labels[p + (window_size // 2)] = 1 - # Debugging information - print("\n==== Debugging Step Extraction ====") - print("Step data (first few rows):") - print(step_counts.head()) - - print("\nExtraction of peaks for the left foot:") - print("Raw data from CSV:", step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values) - print("Extracted peaks:", left_peaks) - - print("\nExtraction of peaks for the right foot:") - print("Raw data from CSV:", step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values) - print("Extracted peaks:", right_peaks) - - print("\nTotal peaks found: Left =", len(left_peaks), ", Right =", len(right_peaks)) - print("==================================\n") + # --- Gait label: 6-dimensional vector --- + # Since the gait is the same for the entire dataset, we store it + # (and will repeat it for each window). + self.gait_label = gait_vector.astype(np.float32) # shape (6,) def __len__(self): return len(self.data) - self.window_size def __getitem__(self, idx): + # x shape: (window_size, 2) x = self.data[idx : idx + self.window_size] - y = self.step_labels[idx : idx + self.window_size] - # Data augmentation: Add slight noise to the data + # Step label per sample in the window + y_step = self.step_labels[idx : idx + self.window_size] # shape (window_size,) + + # Gait label is the same for the entire window: + # We create a shape (window_size, 6) that carries self.gait_label everywhere. + y_gait = np.tile(self.gait_label, (self.window_size, 1)) # (window_size, 6) + + # We combine everything into one label: (window_size, 7) + # Column 0 = step label, columns 1..6 = gait + # => y[i,0] = y_step[i], y[i,1:] = y_gait[i] + y = np.zeros((self.window_size, 7), dtype=np.float32) + y[:, 0] = y_step + y[:, 1:] = y_gait + + # Optional data augmentation noise = np.random.normal(0, 0.02, x.shape) x_augmented = x + noise return x_augmented, y - -def load_datasets(folder_path, window_size, batch_size): +def load_datasets(folder_path, window_size, batch_size, gait_info_df): """ - Reads the following files: - (Folder name)_left_acceleration_data.csv, - (Folder name)_right_acceleration_data.csv, - scaled_step_counts.csv - and creates a DataLoader with segments. + Loads data from: + - (Folder name)_left_acceleration_data.csv + - (Folder name)_right_acceleration_data.csv + - scaled_step_counts.csv + Searches the DataFrame `gait_info_df` for the row corresponding to the current folder ID + and constructs a gait label from it. """ folder_name = os.path.basename(folder_path) left_file = os.path.join(folder_path, f"{folder_name}_left_acceleration_data.csv") right_file = os.path.join(folder_path, f"{folder_name}_right_acceleration_data.csv") step_file = os.path.join(folder_path, "scaled_step_counts.csv") - # Check if all required files exist if not (os.path.exists(left_file) and os.path.exists(right_file) and os.path.exists(step_file)): print(f"Folder {folder_name}: Missing files, skipping.") return None - # Load data from CSV files + # Load CSVs left_data = pd.read_csv(left_file) right_data = pd.read_csv(right_file) step_counts = pd.read_csv(step_file) - # Check if any of the dataframes are empty if left_data.empty or right_data.empty or step_counts.empty: print(f"Folder {folder_name}: Empty data, skipping.") return None - # Create dataset and DataLoader - dataset = StepCounterDataset(left_data, right_data, step_counts, window_size) - return DataLoader(dataset, batch_size=batch_size, shuffle=True) + # --- Get gait label from gait_info_df --- + # e.g., video_id == folder_name + row = gait_info_df[gait_info_df["video_id"] == folder_name] + if row.empty: + print(f"No row found for {folder_name} in gait_info_df, using 0-label.") + gait_label = np.zeros(6, dtype=np.float32) + else: + # Important: Adjust the order here to match the columns from the CSV + gait_label = row[["langsames_gehen","normales_gehen","laufen", + "frei_mitschwingend","links_in_ht","rechts_in_ht"]].values[0] + # => shape (6,) + + dataset = StepCounterDataset(left_data, right_data, step_counts, + window_size, gait_vector=gait_label) + return DataLoader(dataset, batch_size=batch_size, shuffle=True) \ No newline at end of file diff --git a/cnn/model_step_counter.py b/cnn/model_step_counter.py index 4c76e6c..0728f0e 100644 --- a/cnn/model_step_counter.py +++ b/cnn/model_step_counter.py @@ -1,76 +1,48 @@ import torch.nn as nn import torch.nn.functional as F - class StepCounterCNN(nn.Module): def __init__(self, window_size): - """ - Initializes the StepCounterCNN model. - - Args: - window_size (int): The size of the input window for the time series data. - """ super().__init__() + # Everything as before, except the last layer: + self.conv1 = nn.Conv1d(2, 32, kernel_size=7, padding=3) + self.bn1 = nn.BatchNorm1d(32) + self.pool = nn.MaxPool1d(3, stride=2, padding=1) - # First convolutional layer - self.conv1 = nn.Conv1d(2, 32, kernel_size=7, padding=3) # Input channels: 2, Output channels: 32 - self.bn1 = nn.BatchNorm1d(32) # Batch normalization for the first layer - self.pool = nn.MaxPool1d(3, stride=2, padding=1) # Max pooling layer + self.resblock1 = self._make_resblock(32, 64) + self.resblock2 = self._make_resblock(64, 128, stride=2) - # Residual blocks - self.resblock1 = self._make_resblock(32, 64) # First residual block - self.resblock2 = self._make_resblock(64, 128, stride=2) # Second residual block with stride 2 + final_length = window_size // 4 + self.fc1 = nn.Linear(128 * final_length, 64) - # Fully Connected Layers - final_length = window_size // 4 # Calculate the final length after pooling and residual blocks - self.fc1 = nn.Linear(128 * final_length, 64) # First fully connected layer - self.fc2 = nn.Linear(64, 1) # Second fully connected layer - self.sigmoid = nn.Sigmoid() # Sigmoid activation for binary classification - self.dropout = nn.Dropout(0.5) # Dropout for regularization + # IMPORTANT: instead of 1 now 7 outputs (1 for step + 6 for gait types) + self.fc2 = nn.Linear(64, 7) + self.sigmoid = nn.Sigmoid() + self.dropout = nn.Dropout(0.5) def _make_resblock(self, in_channels, out_channels, stride=1): - """ - Creates a residual block with two convolutional layers, batch normalization, and ReLU activation. - - Args: - in_channels (int): Number of input channels. - out_channels (int): Number of output channels. - stride (int): Stride for the first convolutional layer. - - Returns: - nn.Sequential: A sequential container representing the residual block. - """ return nn.Sequential( - nn.Conv1d(in_channels, out_channels, 3, stride=stride, padding=1), # First convolutional layer - nn.BatchNorm1d(out_channels), # Batch normalization - nn.ReLU(), # ReLU activation - nn.Conv1d(out_channels, out_channels, 3, padding=1), # Second convolutional layer - nn.BatchNorm1d(out_channels), # Batch normalization - nn.ReLU(), # ReLU activation - nn.Dropout(0.5), # Dropout for regularization + nn.Conv1d(in_channels, out_channels, 3, stride=stride, padding=1), + nn.BatchNorm1d(out_channels), + nn.ReLU(), + nn.Conv1d(out_channels, out_channels, 3, padding=1), + nn.BatchNorm1d(out_channels), + nn.ReLU(), + nn.Dropout(0.5), ) def forward(self, x): """ - Defines the forward pass of the model. - - Args: - x (torch.Tensor): Input tensor of shape (batch_size, 2, window_size). - - Returns: - torch.Tensor: Output tensor of shape (batch_size, 1) after applying the sigmoid function. + x shape: (batch_size, 2, window_size) + we return shape: (batch_size, 7) """ - # Initial layer - x = self.pool(F.relu(self.bn1(self.conv1(x)))) # Apply convolution, batch norm, ReLU, and pooling - - # Residual blocks - x = self.resblock1(x) # Apply first residual block - x = self.resblock2(x) # Apply second residual block - - # Classification - x = x.flatten(1) # Flatten the tensor for the fully connected layer - x = self.fc1(x) # Apply first fully connected layer - x = F.relu(x) # Apply ReLU activation - x = self.dropout(x) # Apply dropout - x = self.fc2(x) # Apply second fully connected layer - return self.sigmoid(x) # Apply sigmoid activation for binary classification + x = self.pool(F.relu(self.bn1(self.conv1(x)))) + x = self.resblock1(x) + x = self.resblock2(x) + + x = x.flatten(1) # (batch_size, 128*final_length) + x = self.fc1(x) + x = F.relu(x) + x = self.dropout(x) + x = self.fc2(x) + return self.sigmoid(x) \ No newline at end of file diff --git a/cnn/prediction.py b/cnn/prediction.py index 2108d4f..b5ee60c 100644 --- a/cnn/prediction.py +++ b/cnn/prediction.py @@ -7,115 +7,182 @@ import ast from model_step_counter import StepCounterCNN - def load_model(model_path, device, window_size=64): - """Loads the trained model.""" + # Load the model from the specified path and set it to evaluation mode model = StepCounterCNN(window_size) model.load_state_dict(torch.load(model_path, map_location=device)) model.to(device) model.eval() return model - def compute_enmo(data): - """Computes the Euclidean Norm Minus One (ENMO) from accelerometer data.""" - norm = np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) - 1 + # Compute the Euclidean Norm Minus One (ENMO) for the accelerometer data + norm = np.sqrt(data["X"]**2 + data["Y"]**2 + data["Z"]**2) - 1 return np.maximum(norm, 0) - def process_data(left_csv, right_csv): - """Loads and processes acceleration data from left and right foot CSV files.""" + # Load and process the left and right accelerometer data left_df = pd.read_csv(left_csv) right_df = pd.read_csv(right_csv) + return pd.DataFrame({ + "ENMO_left": compute_enmo(left_df), + "ENMO_right": compute_enmo(right_df) + }) + +def detect_multi_label(model, device, data, window_size=64): + """ + Returns per frame: + - step_prob[frame] = model step probability + - gait_probs[frame,0..5] = model gait probabilities + We average overlapping windows as in the old detect_steps(). + """ + data_torch = torch.tensor(StandardScaler().fit_transform(data), dtype=torch.float32, device=device) + n = len(data_torch) + + step_sum = np.zeros(n, dtype=np.float32) + step_cnt = np.zeros(n, dtype=np.float32) + gait_sum = np.zeros((n, 6), dtype=np.float32) + gait_cnt = np.zeros((n, 6), dtype=np.float32) - return pd.DataFrame({"ENMO_left": compute_enmo(left_df), "ENMO_right": compute_enmo(right_df)}) + with torch.no_grad(): + for start in range(n - window_size): + window = data_torch[start : start+window_size].T.unsqueeze(0) # shape (1,2,window_size) + out = model(window) # shape (1,7) + out_np = out[0].cpu().numpy() # shape (7,) + step_val = out_np[0] + gait_vals = out_np[1:] # shape (6,) -def detect_steps(model, device, data, window_size=64): - """Runs the step detection model on the given data.""" - data = torch.tensor(StandardScaler().fit_transform(data), dtype=torch.float32, device=device) - frame_probs = np.zeros(len(data), dtype=np.float32) - overlap_cnt = np.zeros(len(data), dtype=np.float32) + # Distribute values to all indices of the window + step_sum[start : start+window_size] += step_val + step_cnt[start : start+window_size] += 1 - with torch.no_grad(): - for start in range(len(data) - window_size): - window = data[start : start + window_size].T.unsqueeze(0) - frame_probs[start : start + window_size] += model(window).cpu().numpy().flatten() - overlap_cnt[start : start + window_size] += 1 + gait_sum[start : start+window_size, :] += gait_vals + gait_cnt[start : start+window_size, :] += 1 - frame_probs[overlap_cnt > 0] /= overlap_cnt[overlap_cnt > 0] - return find_peaks(frame_probs, height=0.02, distance=30, prominence=0.05)[0] + # Compute averages + mask_step = step_cnt > 0 + step_sum[mask_step] /= step_cnt[mask_step] + mask_gait = gait_cnt > 0 + gait_sum[mask_gait] /= gait_cnt[mask_gait] + + return step_sum, gait_sum # shape(n,) & shape(n,6) def parse_groundtruth_steps(groundtruth_csv): - """Parses the ground truth step data from CSV.""" - groundtruth_df = pd.read_csv(groundtruth_csv, nrows=2) # Only consider the first two rows + # Parse ground truth steps from the CSV file + groundtruth_df = pd.read_csv(groundtruth_csv, nrows=2) steps = set() for peak_str in groundtruth_df["Peaks"].dropna(): try: steps.update(ast.literal_eval(peak_str)) - except (SyntaxError, ValueError): - continue + except: + pass return steps +def plot_results(data, step_probs, gait_probs, detected_steps, groundtruth_steps): + """ + Plotly visualization: + - Data (ENMO_left / ENMO_right) + - step_probs + - detected_steps (as markers) + - groundtruth_steps (as markers) + - a single 'best gait' per frame (via argmax) + """ + # 1) Determine the single best gait per frame by argmax + gait_names = ["langsames_gehen","normales_gehen","laufen", + "frei_mitschwingend","links_in_ht","rechts_in_ht"] + predicted_gait_index = np.argmax(gait_probs, axis=1) + # For display as text + predicted_gait_labels = [gait_names[idx] for idx in predicted_gait_index] -def plot_results(data, detected_steps, groundtruth_steps): - """Generates an interactive Plotly visualization of acceleration data, detected steps, and ground truth.""" fig = go.Figure() time_axis = np.arange(len(data)) - # Plot acceleration data - for col in data.columns: - fig.add_trace(go.Scatter(x=time_axis, y=data[col], mode="lines", name=col)) + # 2) Plot ENMO Left / Right + fig.add_trace(go.Scatter(x=time_axis, y=data["ENMO_left"], mode="lines", name="ENMO_left")) + fig.add_trace(go.Scatter(x=time_axis, y=data["ENMO_right"], mode="lines", name="ENMO_right")) + + # 3) Plot step probability + fig.add_trace(go.Scatter(x=time_axis, y=step_probs, mode="lines", name="Step Probability", line=dict(color="red"))) - # Plot detected steps + # 4) Detected steps (index, y=...) fig.add_trace( go.Scatter( x=list(detected_steps), - y=[data.iloc[i].mean() for i in detected_steps], + y=[step_probs[i] for i in detected_steps], mode="markers", name=f"Detected Steps ({len(detected_steps)})", marker=dict(color="red", size=8), ) ) - # Plot ground truth steps + # 5) Ground Truth Steps fig.add_trace( go.Scatter( x=list(groundtruth_steps), - y=[data.iloc[i].mean() for i in groundtruth_steps], + y=[step_probs[i] for i in groundtruth_steps], mode="markers", name=f"Ground Truth Steps ({len(groundtruth_steps)})", marker=dict(color="green", symbol="x", size=8), ) ) + # 6) Single best gait per frame: we plot the index or label + # Here, we'll plot the index as y-values and show the label as text on hover. + # You can also store the label as discrete categories if desired. + fig.add_trace( + go.Scatter( + x=time_axis, + y=predicted_gait_index, + mode="markers", + name="Predicted Gait (argmax)", + text=predicted_gait_labels, + textposition="top center", + marker=dict(color="blue", size=5) + ) + ) + fig.update_layout( - title="Step Detection Visualization", + title="Steps and Single Best Gait", xaxis_title="Frame", - yaxis_title="Acceleration / Probability", + yaxis_title="Probability / Gait Index", legend_title="Legend", template="plotly_white", ) fig.show() + # If you want to print just the final selected gait (e.g., majority over the entire sequence), + # you could do something like this: + majority_gait_idx = np.bincount(predicted_gait_index).argmax() + majority_gait_label = gait_names[majority_gait_idx] + print(f"Detected {len(detected_steps)} steps in total.") + print(f"Overall predicted gait (majority): {majority_gait_label}") def main(model_path, left_csv, right_csv, groundtruth_csv): - """Runs the full step detection pipeline and visualization.""" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = load_model(model_path, device) + + # Load and process data data = process_data(left_csv, right_csv) - detected_steps = detect_steps(model, device, data) + + # Step & gait probabilities per frame + step_probs, gait_probs = detect_multi_label(model, device, data, window_size=64) + + # Detected steps (small peak detector on step_probs) + idx_peaks = find_peaks(step_probs, height=0.02, distance=30, prominence=0.05)[0] + + # Ground truth groundtruth_steps = parse_groundtruth_steps(groundtruth_csv) - plot_results(data, detected_steps, groundtruth_steps) + # Final plot: only 1 gait (via argmax) + total steps + plot_results(data, step_probs, gait_probs, idx_peaks, groundtruth_steps) if __name__ == "__main__": - model_path = "D:/Daisy/5. Semester/SmartHealth/Step-counter/cnn/best_model.pth" - left_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/005_left_acceleration_data.csv" - right_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/005_right_acceleration_data.csv" - groundtruth_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/scaled_step_counts.csv" - + model_path = "best_model.pth" + left_csv = "path_to_left.csv" + right_csv = "path_to_right.csv" + groundtruth_csv = "path_to_step_counts.csv" main(model_path, left_csv, right_csv, groundtruth_csv) diff --git a/cnn/training.py b/cnn/training.py index cac1645..0232662 100644 --- a/cnn/training.py +++ b/cnn/training.py @@ -1,3 +1,4 @@ +from prediction import main as prediction import os import torch import torch.optim as optim @@ -10,244 +11,310 @@ from data_generator import load_datasets from model_step_counter import StepCounterCNN from torch.utils.data import DataLoader, ConcatDataset, Subset -from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc -from prediction import main as prediction - +from sklearn.metrics import ( + classification_report, + confusion_matrix, + roc_curve, + auc, + multilabel_confusion_matrix +) +import pandas as pd + +# ========================================== +# Helper Classes & Functions +# ========================================== +class EarlyStopping: + def __init__(self, patience=4, min_delta=0.005, path="best_model.pth"): + self.patience = patience + self.min_delta = min_delta + self.path = path + self.best_loss = float("inf") + self.counter = 0 + self.best_epoch = 0 + self.best_train_loss = float("inf") -def load_all_datasets(root_folder, window_size, batch_size): - """ - Loads all datasets from subfolders in the given root folder. + def check(self, train_loss, val_loss, model, epoch): + if val_loss < self.best_loss - self.min_delta: + self.best_loss = val_loss + self.best_train_loss = train_loss + self.best_epoch = epoch + self.counter = 0 + torch.save(model.state_dict(), self.path) + elif abs(train_loss - val_loss) > self.min_delta and val_loss >= self.best_loss: + print("Early stopping triggered due to overfitting!") + model.load_state_dict(torch.load(self.path)) + return True + else: + self.counter += 1 + if self.counter >= self.patience: + print(f"Early stopping triggered! Best model from epoch {self.best_epoch + 1} loaded.") + model.load_state_dict(torch.load(self.path)) + return True + return False - Parameters: - root_folder (str): Path to the root directory containing dataset folders. - window_size (int): Number of samples per window for the model. - batch_size (int): Number of samples per batch. +def split_dataset(dataset, ratio=0.2): + train_idx, test_idx = train_test_split(np.arange(len(dataset)), test_size=ratio, random_state=42) + print(f"Train samples: {len(train_idx)}, Test samples: {len(test_idx)}") + return Subset(dataset, train_idx), Subset(dataset, test_idx) - Returns: - ConcatDataset: Combined dataset from all subfolders. - """ +def load_all_datasets(root_folder, window_size, batch_size, gait_info_df): + """Extended loading of all folders, passing gait_info_df to load_datasets().""" subfolders = [f.path for f in os.scandir(root_folder) if f.is_dir()] if not subfolders: print("No folders found in", root_folder) return None - # Load datasets from each subfolder - all_data_loaders = [ - load_datasets(sf, window_size, batch_size).dataset - for sf in subfolders - if load_datasets(sf, window_size, batch_size) is not None - ] - if not all_data_loaders: + all_datasets = [] + for sf in subfolders: + dl = load_datasets(sf, window_size, batch_size, gait_info_df) # <-- adjusted in data_generator + if dl is not None: + all_datasets.append(dl.dataset) + + if not all_datasets: print("No datasets available!") return None - # Combine all datasets into one - combined = ConcatDataset(all_data_loaders) - print(f"{len(all_data_loaders)} datasets, total: {len(combined)} samples.") + combined = ConcatDataset(all_datasets) + print(f"{len(all_datasets)} datasets, total: {len(combined)} samples.") return combined +# ========================================== +# Main Training Function +# ========================================== +def train_step_counter( + root_folder, + window_size=256, + batch_size=32, + epochs=5, + lr=0.001, + patience=4, + gait_csv="D:/Step-counter/Data/acceleration_metadata.csv" +): + # 1) Load CSV with gait information + gait_info_df = pd.read_csv(gait_csv) + + # 2) Load dataset + combined_dataset = load_all_datasets(root_folder, window_size, batch_size, gait_info_df) + if combined_dataset is None: + return None, None, None -def split_dataset(dataset, ratio=0.2): - """ - Splits the dataset into training and testing sets. + # 3) Split into training/validation + train_ds, test_ds = split_dataset(combined_dataset, ratio=0.2) + train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True) + test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False) - Parameters: - dataset (ConcatDataset): The dataset to be split. - ratio (float): The proportion of data to be used for testing. Default is 0.2 (20%). + # 4) Define model, optimizer, loss + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model = StepCounterCNN(window_size).to(device).float() - Returns: - Tuple[Subset, Subset]: Training and testing dataset subsets. - """ - train_idx, test_idx = train_test_split(np.arange(len(dataset)), test_size=ratio, random_state=42) - print(f"Train samples: {len(train_idx)}, Test samples: {len(test_idx)}") - return Subset(dataset, train_idx), Subset(dataset, test_idx) + # For multi-label: BCE loss without logits, as we already have sigmoid in the model + # For step, additionally a weight if you have imbalance + # Here a simplified example (without special weights) + criterion = nn.BCELoss() + optimizer = optim.Adam(model.parameters(), lr=lr) + # EarlyStopping & loss lists + early_stopping = EarlyStopping(patience=patience) + train_losses, test_losses = [], [] -class EarlyStopping: - def __init__(self, patience=4, min_delta=0.005, path="best_model.pth"): - """ - Initializes the EarlyStopping mechanism. - - Args: - patience (int): Number of epochs to wait for improvement before stopping. - min_delta (float): Minimum improvement in validation loss to be considered significant. - path (str): File path where the best model will be saved. - """ - self.patience = patience # Number of epochs with no improvement before stopping - self.min_delta = min_delta # Minimum required change in validation loss - self.path = path # Path to save the best model - self.best_loss = float("inf") # Initialize best validation loss as infinity - self.counter = 0 # Counter to track epochs without improvement - self.best_epoch = 0 # Stores the epoch with the best validation loss - self.best_train_loss = float("inf") # Stores the training loss of the best model + # 5) Training + for ep in range(epochs): + model.train() + ep_loss = 0.0 - def check(self, train_loss, val_loss, model, epoch): - """ - Checks whether training should stop early based on validation loss. + for X, Y in tqdm(train_loader, desc=f"Epoch {ep+1}/{epochs}"): + # X shape: (batch, window_size, 2) => (batch,2,window_size) + X = X.permute(0,2,1).float().to(device) + Y = Y.float().to(device) # shape: (batch, window_size, 7) - Args: - train_loss (float): Current training loss. - val_loss (float): Current validation loss. - model (torch.nn.Module): The PyTorch model being trained. - epoch (int): The current epoch number. + # 1) Step label + y_step = Y[:,:,0].max(dim=1).values.unsqueeze(1) # (batch,1) - Returns: - bool: True if training should stop, False otherwise. - """ + # 2) Gait label + y_gait = Y[:,0,1:] # (batch,6) - # If the validation loss improves significantly, save the model - if val_loss < self.best_loss - self.min_delta: - self.best_loss = val_loss # Update best validation loss - self.best_train_loss = train_loss # Store corresponding training loss - self.best_epoch = epoch # Store epoch number of the best model - self.counter = 0 # Reset counter since there was an improvement - torch.save(model.state_dict(), self.path) # Save the best model checkpoint + optimizer.zero_grad() + out = model(X) # (batch,7) - # Check if overfitting occurs (training loss is much lower than validation loss) - elif abs(train_loss - val_loss) > self.min_delta and val_loss >= self.best_loss: - print("Early stopping triggered due to overfitting!") # Print warning - model.load_state_dict(torch.load(self.path)) # Load the best saved model - return True # Stop training + # => pred_step shape (batch,1), pred_gait shape (batch,6) + pred_step = out[:,0].unsqueeze(1) + pred_gait = out[:,1:] - else: - # No significant improvement, increment counter - self.counter += 1 + loss_step = criterion(pred_step, y_step) + loss_gait = criterion(pred_gait, y_gait) + loss = loss_step + loss_gait + loss.backward() + optimizer.step() - # If patience limit is reached, stop training - if self.counter >= self.patience: - print(f"Early stopping triggered! Best model from epoch {self.best_epoch + 1} loaded from {self.path}") - model.load_state_dict(torch.load(self.path)) # Load the best model - return True # Stop training + ep_loss += loss.item() - return False # Continue training + train_loss = ep_loss / len(train_loader) + train_losses.append(train_loss) + # 6) Validation + model.eval() + val_loss = 0.0 + with torch.no_grad(): + for X_val, Y_val in test_loader: + X_val = X_val.permute(0,2,1).float().to(device) + Y_val = Y_val.float().to(device) -def train_step_counter(root_folder, window_size=256, batch_size=32, epochs=5, lr=0.001, patience=4): - combined_dataset = load_all_datasets(root_folder, window_size, batch_size) - if combined_dataset is None: - return None, None, None + y_step_val = Y_val[:,:,0].max(dim=1).values.unsqueeze(1) + y_gait_val = Y_val[:,0,1:] - train_ds, test_ds = split_dataset(combined_dataset) - train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True) - test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False) + out_val = model(X_val) + pred_step_val = out_val[:,0].unsqueeze(1) + pred_gait_val = out_val[:,1:] - device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - model = StepCounterCNN(window_size).to(device).float() + loss_step_val = criterion(pred_step_val, y_step_val) + loss_gait_val = criterion(pred_gait_val, y_gait_val) + val_loss += (loss_step_val + loss_gait_val).item() - criterion = nn.BCELoss(weight=torch.tensor([5.0], device=device).float()) - optimizer = optim.Adam(model.parameters(), lr=lr) - train_losses, test_losses = [], [] - early_stopping = EarlyStopping(patience=patience) + val_loss /= len(test_loader) + test_losses.append(val_loss) - model.train() - for ep in range(epochs): - ep_loss = 0.0 - with tqdm(total=len(train_loader), desc=f"Epoch {ep+1}/{epochs}") as pbar: - for X, Y in train_loader: - X, Y = X.float().to(device).permute(0, 2, 1), Y.float().to(device).max(dim=1, keepdim=True)[0] - optimizer.zero_grad() - loss = criterion(model(X), Y) - loss.backward() - optimizer.step() - ep_loss += loss.item() - pbar.update(1) - - train_losses.append(ep_loss / len(train_loader)) + print(f"Epoch {ep+1}: train_loss={train_loss:.4f} val_loss={val_loss:.4f}") - model.eval() - with torch.no_grad(): - test_loss = sum( - criterion( - model(X.float().to(device).permute(0, 2, 1)), Y.float().to(device).max(dim=1, keepdim=True)[0] - ).item() - for X, Y in test_loader - ) / len(test_loader) - test_losses.append(test_loss) - print(f"\U0001F535 Epoch {ep+1}, Train Loss: {train_losses[-1]:.4f}, Test Loss: {test_losses[-1]:.4f}") - - if early_stopping.check(train_losses[-1], test_losses[-1], model, ep): + # Early Stopping + if early_stopping.check(train_loss, val_loss, model, ep): break + # 7) Plot training progress plt.plot(train_losses, label="Training Loss") - plt.plot(test_losses, label="Test Loss", linestyle="dashed") + plt.plot(test_losses, label="Validation Loss", linestyle="dashed") plt.xlabel("Epoch") plt.ylabel("Loss") plt.legend() plt.grid() plt.show() - print(f"The model was saved at epoch {early_stopping.best_epoch + 1}.") + print(f"Best model was saved from epoch {early_stopping.best_epoch + 1}.") return model, test_loader, device - +# ========================================== +# Evaluate Function with Multi-Label +# ========================================== def evaluate_model(model, test_loader, device): """ - Evaluates the trained model using a test dataset and generates performance metrics. - - Parameters: - model (torch.nn.Module): Trained model to be evaluated. - test_loader (DataLoader): DataLoader for the test dataset. - device (torch.device): The device (CPU/GPU) on which evaluation is performed. - - Outputs: - - Prints a classification report. - - Displays a confusion matrix. - - Roc Plot + Evaluates the trained model on: + - Steps (binary) + - 6 Gait labels (multi-label) + Then prints classification reports and confusion matrices, + plus ROC curves for each label. """ model.eval() - y_true, y_pred, y_scores = [], [], [] + + all_step_true, all_step_pred, all_step_prob = [], [], [] + all_gait_true, all_gait_pred, all_gait_prob = [], [], [] + with torch.no_grad(): for X, Y in test_loader: - X, Y = X.float().to(device).permute(0, 2, 1), Y.float().to(device).max(dim=1, keepdim=True)[0] - outputs = model(X).cpu().numpy() - predictions = (outputs > 0.5).astype(int) - y_true.extend(Y.cpu().numpy().flatten()) - y_pred.extend(predictions.flatten()) - y_scores.extend(outputs.flatten()) - - print("Classification Report:") - print(classification_report(y_true, y_pred)) - - cm = confusion_matrix(y_true, y_pred) - plt.figure(figsize=(6, 5)) - sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["No Step", "Step"], yticklabels=["No Step", "Step"]) + X = X.permute(0,2,1).float().to(device) + Y = Y.float().to(device) # (batch, window_size, 7) + + # True labels + y_step = Y[:,:,0].max(dim=1).values # (batch,) + y_gait = Y[:,0,1:] # (batch,6) + + out = model(X) # (batch,7) + pred_step = out[:,0] # (batch,) + pred_gait = out[:,1:]# (batch,6) + + # Save for later metrics + all_step_true.append(y_step.cpu().numpy()) # shape (batch,) + all_step_prob.append(pred_step.cpu().numpy()) # shape (batch,) + all_step_pred.append((pred_step>0.5).cpu().numpy()) # shape (batch,) + + all_gait_true.append(y_gait.cpu().numpy()) # shape (batch,6) + all_gait_prob.append(pred_gait.cpu().numpy()) # shape (batch,6) + all_gait_pred.append((pred_gait>0.5).cpu().numpy()) # shape (batch,6) + + # Combine into NumPy arrays + all_step_true = np.concatenate(all_step_true, axis=0) + all_step_prob = np.concatenate(all_step_prob, axis=0) + all_step_pred = np.concatenate(all_step_pred, axis=0) + + all_gait_true = np.concatenate(all_gait_true, axis=0) # (N,6) + all_gait_prob = np.concatenate(all_gait_prob, axis=0) # (N,6) + all_gait_pred = np.concatenate(all_gait_pred, axis=0) # (N,6) + + # --- 1) Steps (binary classification) --- + print("\n=== Steps (Binary) ===") + print(classification_report(all_step_true, all_step_pred, target_names=["No Step","Step"])) + + cm = confusion_matrix(all_step_true, all_step_pred) + plt.figure(figsize=(5,4)) + sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["No Step","Step"], yticklabels=["No Step","Step"]) + plt.title("Confusion Matrix (Steps)") plt.xlabel("Predicted") - plt.ylabel("Actual") - plt.title("Confusion Matrix") + plt.ylabel("True") plt.show() - # ROC Curve - fpr, tpr, _ = roc_curve(y_true, y_scores) - roc_auc = auc(fpr, tpr) - - plt.figure(figsize=(6, 5)) - plt.plot(fpr, tpr, color="blue", lw=2, label=f"ROC curve (area = {roc_auc:.2f})") - plt.plot([0, 1], [0, 1], color="gray", linestyle="dashed") + # ROC curve for steps + fpr, tpr, _ = roc_curve(all_step_true, all_step_prob) + step_auc = auc(fpr, tpr) + plt.figure() + plt.plot(fpr, tpr, label=f"Step AUC={step_auc:.2f}") + plt.plot([0,1],[0,1], "--", color="gray") plt.xlabel("False Positive Rate") plt.ylabel("True Positive Rate") - plt.title("ROC Curve") - plt.legend(loc="lower right") + plt.title("ROC Curve (Steps)") + plt.legend() plt.grid() plt.show() + # --- 2) Gait types (multi-label) --- + gait_names = ["langsames_gehen","normales_gehen","laufen", + "frei_mitschwingend","links_in_ht","rechts_in_ht"] + + print("\n=== Gait Types (Multi-Label) ===") + print("-> Classification report per label:") + print(classification_report(all_gait_true, all_gait_pred, target_names=gait_names)) + + # Multi-label confusion matrix (each row = own 2x2) + ml_cms = multilabel_confusion_matrix(all_gait_true, all_gait_pred) + for i, label_name in enumerate(gait_names): + cm_i = ml_cms[i] + plt.figure(figsize=(5,4)) + sns.heatmap(cm_i, annot=True, fmt="d", cmap="Blues") + plt.title(f"Confusion Matrix: {label_name}") + plt.xlabel("Predicted") + plt.ylabel("True") + plt.show() + + # ROC curves for each gait type + plt.figure() + for i, label_name in enumerate(gait_names): + fpr_i, tpr_i, _ = roc_curve(all_gait_true[:,i], all_gait_prob[:,i]) + auc_i = auc(fpr_i, tpr_i) + plt.plot(fpr_i, tpr_i, label=f"{label_name} (AUC={auc_i:.2f})") + plt.plot([0,1],[0,1],"--",color="gray") + plt.xlabel("False Positive Rate") + plt.ylabel("True Positive Rate") + plt.title("ROC Curves (Gaits)") + plt.legend() + plt.grid() + plt.show() def main(): - root_folder = "D:\Step-counter\Output" + # Example paths + root_folder = r"D:\Step-counter\Output" window_size = 64 batch_size = 128 epochs = 20 + lr = 1e-3 - model, test_loader, device = train_step_counter(root_folder, window_size, batch_size, epochs, 1e-3) + model, test_loader, device = train_step_counter( + root_folder, window_size, batch_size, epochs, lr + ) if model is not None and test_loader is not None: evaluate_model(model, test_loader, device) - model_path = "best_model.pth" - left_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/005_left_acceleration_data.csv" - right_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/005_right_acceleration_data.csv" - stepcount_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/scaled_step_counts.csv" + left_csv = "D:\Step-counter\Output\GX010061\GX010061_left_acceleration_data.csv" + right_csv = "D:\Step-counter\Output\GX010061\GX010061_right_acceleration_data.csv" + stepcount_csv = "D:\Step-counter\Output\GX010061\scaled_step_counts.csv" prediction(model_path, left_csv, right_csv, stepcount_csv) - if __name__ == "__main__": - main() + main() \ No newline at end of file From ce9d5c46de061dda7a97f75a980e99d753bf1d98 Mon Sep 17 00:00:00 2001 From: ModusMorris Date: Thu, 27 Feb 2025 10:54:30 +0100 Subject: [PATCH 2/2] better plot results --- cnn/prediction.py | 135 +++++++++++++++++++++++++++------------------- cnn/training.py | 2 +- 2 files changed, 81 insertions(+), 56 deletions(-) diff --git a/cnn/prediction.py b/cnn/prediction.py index b5ee60c..ed51ec2 100644 --- a/cnn/prediction.py +++ b/cnn/prediction.py @@ -83,83 +83,108 @@ def parse_groundtruth_steps(groundtruth_csv): def plot_results(data, step_probs, gait_probs, detected_steps, groundtruth_steps): """ Plotly visualization: - - Data (ENMO_left / ENMO_right) - - step_probs - - detected_steps (as markers) - - groundtruth_steps (as markers) - - a single 'best gait' per frame (via argmax) + - ENMO_left / ENMO_right as lines (left axis). + - Step probability (0..1) as a red line on the left axis. + - Detected steps as red markers, ground truth steps as green markers. + - Predicted gait on a SECOND y-axis (right side), showing category labels instead of 0..5. """ # 1) Determine the single best gait per frame by argmax gait_names = ["langsames_gehen","normales_gehen","laufen", "frei_mitschwingend","links_in_ht","rechts_in_ht"] predicted_gait_index = np.argmax(gait_probs, axis=1) - # For display as text - predicted_gait_labels = [gait_names[idx] for idx in predicted_gait_index] fig = go.Figure() time_axis = np.arange(len(data)) - # 2) Plot ENMO Left / Right - fig.add_trace(go.Scatter(x=time_axis, y=data["ENMO_left"], mode="lines", name="ENMO_left")) - fig.add_trace(go.Scatter(x=time_axis, y=data["ENMO_right"], mode="lines", name="ENMO_right")) - - # 3) Plot step probability - fig.add_trace(go.Scatter(x=time_axis, y=step_probs, mode="lines", name="Step Probability", line=dict(color="red"))) - - # 4) Detected steps (index, y=...) - fig.add_trace( - go.Scatter( - x=list(detected_steps), - y=[step_probs[i] for i in detected_steps], - mode="markers", - name=f"Detected Steps ({len(detected_steps)})", - marker=dict(color="red", size=8), - ) - ) - - # 5) Ground Truth Steps - fig.add_trace( - go.Scatter( - x=list(groundtruth_steps), - y=[step_probs[i] for i in groundtruth_steps], - mode="markers", - name=f"Ground Truth Steps ({len(groundtruth_steps)})", - marker=dict(color="green", symbol="x", size=8), - ) - ) - - # 6) Single best gait per frame: we plot the index or label - # Here, we'll plot the index as y-values and show the label as text on hover. - # You can also store the label as discrete categories if desired. - fig.add_trace( - go.Scatter( - x=time_axis, - y=predicted_gait_index, - mode="markers", - name="Predicted Gait (argmax)", - text=predicted_gait_labels, - textposition="top center", - marker=dict(color="blue", size=5) - ) - ) - + # ENMO signals on the left axis + fig.add_trace(go.Scatter( + x=time_axis, + y=data["ENMO_left"], + mode="lines", + name="ENMO_left", + yaxis="y1" + )) + fig.add_trace(go.Scatter( + x=time_axis, + y=data["ENMO_right"], + mode="lines", + name="ENMO_right", + yaxis="y1" + )) + + # Step probability on the left axis as well + fig.add_trace(go.Scatter( + x=time_axis, + y=step_probs, + mode="lines", + name="Step Probability", # the CNN output for steps + line=dict(color="red"), + yaxis="y1" + )) + + # Detected steps (red markers) on the left axis + fig.add_trace(go.Scatter( + x=list(detected_steps), + y=[step_probs[i] for i in detected_steps], + mode="markers", + name=f"Detected Steps ({len(detected_steps)})", + marker=dict(color="red", size=8), + yaxis="y1" + )) + + # Ground truth steps (green markers) on the left axis + fig.add_trace(go.Scatter( + x=list(groundtruth_steps), + y=[step_probs[i] for i in groundtruth_steps], + mode="markers", + name=f"Ground Truth Steps ({len(groundtruth_steps)})", + marker=dict(color="green", symbol="x", size=8), + yaxis="y1" + )) + + # Plot predicted gait on a SECOND y-axis, showing index as numeric + # but we will map them to category labels with tickvals/ticktext below + fig.add_trace(go.Scatter( + x=time_axis, + y=predicted_gait_index, + mode="markers", + name="Predicted Gait", + marker=dict(color="blue", size=6), + yaxis="y2" # <--- second axis + )) + + # Layout with two y-axes fig.update_layout( title="Steps and Single Best Gait", - xaxis_title="Frame", - yaxis_title="Probability / Gait Index", + xaxis=dict(title="Frame"), + yaxis=dict( + title="Acceleration / Probability", + side="left", + range=[0, max(step_probs.max(), data["ENMO_left"].max(), data["ENMO_right"].max())+0.5], + ), + # y2: the axis for the gait index + yaxis2=dict( + title="Predicted Gait (Categories)", + overlaying="y", # shares the same x-axis + side="right", + tickmode="array", + tickvals=[0,1,2,3,4,5], # indices 0..5 + ticktext=gait_names, # the actual labels + range=[-0.5, 5.5] + ), legend_title="Legend", template="plotly_white", ) fig.show() - # If you want to print just the final selected gait (e.g., majority over the entire sequence), - # you could do something like this: + # Majority vote on the entire sequence majority_gait_idx = np.bincount(predicted_gait_index).argmax() majority_gait_label = gait_names[majority_gait_idx] print(f"Detected {len(detected_steps)} steps in total.") print(f"Overall predicted gait (majority): {majority_gait_label}") + def main(model_path, left_csv, right_csv, groundtruth_csv): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = load_model(model_path, device) diff --git a/cnn/training.py b/cnn/training.py index 0232662..b7d0f04 100644 --- a/cnn/training.py +++ b/cnn/training.py @@ -83,7 +83,7 @@ def load_all_datasets(root_folder, window_size, batch_size, gait_info_df): # ========================================== def train_step_counter( root_folder, - window_size=256, + window_size=64, batch_size=32, epochs=5, lr=0.001,