diff --git a/cnn/best_model.pth b/cnn/best_model.pth new file mode 100644 index 0000000..9efc2b3 Binary files /dev/null and b/cnn/best_model.pth differ diff --git a/cnn/data_generator.py b/cnn/data_generator.py index 75b0324..e342036 100644 --- a/cnn/data_generator.py +++ b/cnn/data_generator.py @@ -7,9 +7,9 @@ def compute_enmo(data): - # calculate ENMNO for data + # Calculate the ENMO value for the data norm = np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) - 1 - return np.maximum(norm, 0) # Negative Werte auf 0 setzen + return np.maximum(norm, 0) # Set negative values to 0 class StepCounterDataset(Dataset): @@ -20,13 +20,14 @@ def __init__(self, left_data, right_data, step_counts, window_size): left_data["ENMO"] = compute_enmo(left_data) right_data["ENMO"] = compute_enmo(right_data) + # Calculate the difference in ENMO values left_data["ENMO_DIFF"] = left_data["ENMO"].diff().fillna(0) right_data["ENMO_DIFF"] = right_data["ENMO"].diff().fillna(0) - # ENMO compare for data + # Stack the ENMO differences for both feet self.data = np.hstack((left_data[["ENMO_DIFF"]], right_data[["ENMO_DIFF"]])) - # Normalize data + # Normalize the data self.scaler = StandardScaler() self.data = self.scaler.fit_transform(self.data) @@ -39,26 +40,29 @@ def extract_peaks(peaks_str): return [] return [] + # Extract peaks for left and right feet left_peaks = extract_peaks(step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values[0]) right_peaks = extract_peaks(step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values[0]) - # Create labels + # Create step labels self.step_labels = np.zeros(len(self.data), dtype=np.float32) - # Shift step labels so CNN learns peak positions better + # Shift step labels to improve peak positions for CNN for p in left_peaks + right_peaks: if 0 <= p < len(self.step_labels) - (window_size // 2): self.step_labels[p + (window_size // 2)] = 1 + + # Debugging information print("\n==== Debugging Step Extraction ====") - print("Step count dataset (first few rows):") + print("Step data (first few rows):") print(step_counts.head()) - print("\nLeft foot peak extraction:") - print("Raw string from CSV:", step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values) + print("\nExtraction of peaks for the left foot:") + print("Raw data from CSV:", step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values) print("Extracted peaks:", left_peaks) - print("\nRight foot peak extraction:") - print("Raw string from CSV:", step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values) + print("\nExtraction of peaks for the right foot:") + print("Raw data from CSV:", step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values) print("Extracted peaks:", right_peaks) print("\nTotal peaks found: Left =", len(left_peaks), ", Right =", len(right_peaks)) @@ -70,14 +74,20 @@ def __len__(self): def __getitem__(self, idx): x = self.data[idx : idx + self.window_size] y = self.step_labels[idx : idx + self.window_size] - return x, y + + # Data augmentation: Add slight noise to the data + noise = np.random.normal(0, 0.02, x.shape) + x_augmented = x + noise + + return x_augmented, y def load_datasets(folder_path, window_size, batch_size): """ - Reads (foldername)_left_acceleration_data.csv, - (foldername)_right_acceleration_data.csv, - scaled_step_counts.csv + Reads the following files: + (Folder name)_left_acceleration_data.csv, + (Folder name)_right_acceleration_data.csv, + scaled_step_counts.csv and creates a DataLoader with segments. """ folder_name = os.path.basename(folder_path) @@ -85,17 +95,21 @@ def load_datasets(folder_path, window_size, batch_size): right_file = os.path.join(folder_path, f"{folder_name}_right_acceleration_data.csv") step_file = os.path.join(folder_path, "scaled_step_counts.csv") + # Check if all required files exist if not (os.path.exists(left_file) and os.path.exists(right_file) and os.path.exists(step_file)): print(f"Folder {folder_name}: Missing files, skipping.") return None + # Load data from CSV files left_data = pd.read_csv(left_file) right_data = pd.read_csv(right_file) step_counts = pd.read_csv(step_file) + # Check if any of the dataframes are empty if left_data.empty or right_data.empty or step_counts.empty: print(f"Folder {folder_name}: Empty data, skipping.") return None + # Create dataset and DataLoader dataset = StepCounterDataset(left_data, right_data, step_counts, window_size) return DataLoader(dataset, batch_size=batch_size, shuffle=True) diff --git a/cnn/model_step_counter.py b/cnn/model_step_counter.py index eac927f..4c76e6c 100644 --- a/cnn/model_step_counter.py +++ b/cnn/model_step_counter.py @@ -1,41 +1,76 @@ import torch.nn as nn +import torch.nn.functional as F class StepCounterCNN(nn.Module): def __init__(self, window_size): + """ + Initializes the StepCounterCNN model. + + Args: + window_size (int): The size of the input window for the time series data. + """ super().__init__() - self.conv1 = nn.Conv1d(2, 32, kernel_size=5, padding=2) - self.relu1 = nn.ReLU() - self.pool1 = nn.MaxPool1d(kernel_size=2) - self.batch_norm1 = nn.BatchNorm1d(32) - - self.conv2 = nn.Conv1d(32, 64, kernel_size=5, padding=2) - self.relu2 = nn.ReLU() - self.pool2 = nn.MaxPool1d(kernel_size=2) - self.batch_norm2 = nn.BatchNorm1d(64) - - fc1_input_size = (window_size // 4) * 64 - self.fc1 = nn.Linear(fc1_input_size, 128) - self.relu3 = nn.ReLU() - self.fc2 = nn.Linear(128, 1) - self.sigmoid = nn.Sigmoid() - self.dropout = nn.Dropout(0.3) + + # First convolutional layer + self.conv1 = nn.Conv1d(2, 32, kernel_size=7, padding=3) # Input channels: 2, Output channels: 32 + self.bn1 = nn.BatchNorm1d(32) # Batch normalization for the first layer + self.pool = nn.MaxPool1d(3, stride=2, padding=1) # Max pooling layer + + # Residual blocks + self.resblock1 = self._make_resblock(32, 64) # First residual block + self.resblock2 = self._make_resblock(64, 128, stride=2) # Second residual block with stride 2 + + # Fully Connected Layers + final_length = window_size // 4 # Calculate the final length after pooling and residual blocks + self.fc1 = nn.Linear(128 * final_length, 64) # First fully connected layer + self.fc2 = nn.Linear(64, 1) # Second fully connected layer + self.sigmoid = nn.Sigmoid() # Sigmoid activation for binary classification + self.dropout = nn.Dropout(0.5) # Dropout for regularization + + def _make_resblock(self, in_channels, out_channels, stride=1): + """ + Creates a residual block with two convolutional layers, batch normalization, and ReLU activation. + + Args: + in_channels (int): Number of input channels. + out_channels (int): Number of output channels. + stride (int): Stride for the first convolutional layer. + + Returns: + nn.Sequential: A sequential container representing the residual block. + """ + return nn.Sequential( + nn.Conv1d(in_channels, out_channels, 3, stride=stride, padding=1), # First convolutional layer + nn.BatchNorm1d(out_channels), # Batch normalization + nn.ReLU(), # ReLU activation + nn.Conv1d(out_channels, out_channels, 3, padding=1), # Second convolutional layer + nn.BatchNorm1d(out_channels), # Batch normalization + nn.ReLU(), # ReLU activation + nn.Dropout(0.5), # Dropout for regularization + ) def forward(self, x): - x = self.conv1(x) - x = self.relu1(x) - x = self.pool1(x) - x = self.batch_norm1(x) - - x = self.conv2(x) - x = self.relu2(x) - x = self.pool2(x) - x = self.batch_norm2(x) - - x = x.flatten(start_dim=1) - x = self.fc1(x) - x = self.relu3(x) - x = self.dropout(x) - x = self.fc2(x) - x = self.sigmoid(x) - return x + """ + Defines the forward pass of the model. + + Args: + x (torch.Tensor): Input tensor of shape (batch_size, 2, window_size). + + Returns: + torch.Tensor: Output tensor of shape (batch_size, 1) after applying the sigmoid function. + """ + # Initial layer + x = self.pool(F.relu(self.bn1(self.conv1(x)))) # Apply convolution, batch norm, ReLU, and pooling + + # Residual blocks + x = self.resblock1(x) # Apply first residual block + x = self.resblock2(x) # Apply second residual block + + # Classification + x = x.flatten(1) # Flatten the tensor for the fully connected layer + x = self.fc1(x) # Apply first fully connected layer + x = F.relu(x) # Apply ReLU activation + x = self.dropout(x) # Apply dropout + x = self.fc2(x) # Apply second fully connected layer + return self.sigmoid(x) # Apply sigmoid activation for binary classification diff --git a/cnn/prediction.py b/cnn/prediction.py new file mode 100644 index 0000000..2108d4f --- /dev/null +++ b/cnn/prediction.py @@ -0,0 +1,121 @@ +import torch +import numpy as np +import pandas as pd +import plotly.graph_objects as go +from scipy.signal import find_peaks +from sklearn.preprocessing import StandardScaler +import ast +from model_step_counter import StepCounterCNN + + +def load_model(model_path, device, window_size=64): + """Loads the trained model.""" + model = StepCounterCNN(window_size) + model.load_state_dict(torch.load(model_path, map_location=device)) + model.to(device) + model.eval() + return model + + +def compute_enmo(data): + """Computes the Euclidean Norm Minus One (ENMO) from accelerometer data.""" + norm = np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) - 1 + return np.maximum(norm, 0) + + +def process_data(left_csv, right_csv): + """Loads and processes acceleration data from left and right foot CSV files.""" + left_df = pd.read_csv(left_csv) + right_df = pd.read_csv(right_csv) + + return pd.DataFrame({"ENMO_left": compute_enmo(left_df), "ENMO_right": compute_enmo(right_df)}) + + +def detect_steps(model, device, data, window_size=64): + """Runs the step detection model on the given data.""" + data = torch.tensor(StandardScaler().fit_transform(data), dtype=torch.float32, device=device) + frame_probs = np.zeros(len(data), dtype=np.float32) + overlap_cnt = np.zeros(len(data), dtype=np.float32) + + with torch.no_grad(): + for start in range(len(data) - window_size): + window = data[start : start + window_size].T.unsqueeze(0) + frame_probs[start : start + window_size] += model(window).cpu().numpy().flatten() + overlap_cnt[start : start + window_size] += 1 + + frame_probs[overlap_cnt > 0] /= overlap_cnt[overlap_cnt > 0] + return find_peaks(frame_probs, height=0.02, distance=30, prominence=0.05)[0] + + +def parse_groundtruth_steps(groundtruth_csv): + """Parses the ground truth step data from CSV.""" + groundtruth_df = pd.read_csv(groundtruth_csv, nrows=2) # Only consider the first two rows + steps = set() + for peak_str in groundtruth_df["Peaks"].dropna(): + try: + steps.update(ast.literal_eval(peak_str)) + except (SyntaxError, ValueError): + continue + return steps + + +def plot_results(data, detected_steps, groundtruth_steps): + """Generates an interactive Plotly visualization of acceleration data, detected steps, and ground truth.""" + fig = go.Figure() + time_axis = np.arange(len(data)) + + # Plot acceleration data + for col in data.columns: + fig.add_trace(go.Scatter(x=time_axis, y=data[col], mode="lines", name=col)) + + # Plot detected steps + fig.add_trace( + go.Scatter( + x=list(detected_steps), + y=[data.iloc[i].mean() for i in detected_steps], + mode="markers", + name=f"Detected Steps ({len(detected_steps)})", + marker=dict(color="red", size=8), + ) + ) + + # Plot ground truth steps + fig.add_trace( + go.Scatter( + x=list(groundtruth_steps), + y=[data.iloc[i].mean() for i in groundtruth_steps], + mode="markers", + name=f"Ground Truth Steps ({len(groundtruth_steps)})", + marker=dict(color="green", symbol="x", size=8), + ) + ) + + fig.update_layout( + title="Step Detection Visualization", + xaxis_title="Frame", + yaxis_title="Acceleration / Probability", + legend_title="Legend", + template="plotly_white", + ) + + fig.show() + + +def main(model_path, left_csv, right_csv, groundtruth_csv): + """Runs the full step detection pipeline and visualization.""" + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model = load_model(model_path, device) + data = process_data(left_csv, right_csv) + detected_steps = detect_steps(model, device, data) + groundtruth_steps = parse_groundtruth_steps(groundtruth_csv) + + plot_results(data, detected_steps, groundtruth_steps) + + +if __name__ == "__main__": + model_path = "D:/Daisy/5. Semester/SmartHealth/Step-counter/cnn/best_model.pth" + left_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/005_left_acceleration_data.csv" + right_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/005_right_acceleration_data.csv" + groundtruth_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/scaled_step_counts.csv" + + main(model_path, left_csv, right_csv, groundtruth_csv) diff --git a/cnn/training.py b/cnn/training.py index 1464046..cac1645 100644 --- a/cnn/training.py +++ b/cnn/training.py @@ -5,16 +5,13 @@ import numpy as np import matplotlib.pyplot as plt import seaborn as sns -import plotly.graph_objects as go from tqdm import tqdm from sklearn.model_selection import train_test_split from data_generator import load_datasets from model_step_counter import StepCounterCNN from torch.utils.data import DataLoader, ConcatDataset, Subset -from sklearn.preprocessing import StandardScaler from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc -import pandas as pd -from scipy.signal import find_peaks +from prediction import main as prediction def load_all_datasets(root_folder, window_size, batch_size): @@ -66,25 +63,70 @@ def split_dataset(dataset, ratio=0.2): return Subset(dataset, train_idx), Subset(dataset, test_idx) -def train_step_counter(root_folder, window_size=256, batch_size=32, epochs=5, lr=0.001): - """ - Loads data, trains the step counter CNN model, and evaluates loss. - - Parameters: - root_folder (str): Path to the dataset directory. - window_size (int): Number of samples per input window. - batch_size (int): Number of samples per batch. - epochs (int): Number of training epochs. - lr (float): Learning rate for the optimizer. - - Returns: - Tuple[StepCounterCNN, DataLoader, torch.device]: Trained model, test data loader, and device used. - """ +class EarlyStopping: + def __init__(self, patience=4, min_delta=0.005, path="best_model.pth"): + """ + Initializes the EarlyStopping mechanism. + + Args: + patience (int): Number of epochs to wait for improvement before stopping. + min_delta (float): Minimum improvement in validation loss to be considered significant. + path (str): File path where the best model will be saved. + """ + self.patience = patience # Number of epochs with no improvement before stopping + self.min_delta = min_delta # Minimum required change in validation loss + self.path = path # Path to save the best model + self.best_loss = float("inf") # Initialize best validation loss as infinity + self.counter = 0 # Counter to track epochs without improvement + self.best_epoch = 0 # Stores the epoch with the best validation loss + self.best_train_loss = float("inf") # Stores the training loss of the best model + + def check(self, train_loss, val_loss, model, epoch): + """ + Checks whether training should stop early based on validation loss. + + Args: + train_loss (float): Current training loss. + val_loss (float): Current validation loss. + model (torch.nn.Module): The PyTorch model being trained. + epoch (int): The current epoch number. + + Returns: + bool: True if training should stop, False otherwise. + """ + + # If the validation loss improves significantly, save the model + if val_loss < self.best_loss - self.min_delta: + self.best_loss = val_loss # Update best validation loss + self.best_train_loss = train_loss # Store corresponding training loss + self.best_epoch = epoch # Store epoch number of the best model + self.counter = 0 # Reset counter since there was an improvement + torch.save(model.state_dict(), self.path) # Save the best model checkpoint + + # Check if overfitting occurs (training loss is much lower than validation loss) + elif abs(train_loss - val_loss) > self.min_delta and val_loss >= self.best_loss: + print("Early stopping triggered due to overfitting!") # Print warning + model.load_state_dict(torch.load(self.path)) # Load the best saved model + return True # Stop training + + else: + # No significant improvement, increment counter + self.counter += 1 + + # If patience limit is reached, stop training + if self.counter >= self.patience: + print(f"Early stopping triggered! Best model from epoch {self.best_epoch + 1} loaded from {self.path}") + model.load_state_dict(torch.load(self.path)) # Load the best model + return True # Stop training + + return False # Continue training + + +def train_step_counter(root_folder, window_size=256, batch_size=32, epochs=5, lr=0.001, patience=4): combined_dataset = load_all_datasets(root_folder, window_size, batch_size) if combined_dataset is None: return None, None, None - # Split dataset into training and testing sets train_ds, test_ds = split_dataset(combined_dataset) train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True) test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False) @@ -92,12 +134,11 @@ def train_step_counter(root_folder, window_size=256, batch_size=32, epochs=5, lr device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = StepCounterCNN(window_size).to(device).float() - # Define loss function and optimizer criterion = nn.BCELoss(weight=torch.tensor([5.0], device=device).float()) optimizer = optim.Adam(model.parameters(), lr=lr) train_losses, test_losses = [], [] + early_stopping = EarlyStopping(patience=patience) - # Training loop model.train() for ep in range(epochs): ep_loss = 0.0 @@ -111,10 +152,8 @@ def train_step_counter(root_folder, window_size=256, batch_size=32, epochs=5, lr ep_loss += loss.item() pbar.update(1) - # Store training loss train_losses.append(ep_loss / len(train_loader)) - # Evaluate test loss model.eval() with torch.no_grad(): test_loss = sum( @@ -126,15 +165,18 @@ def train_step_counter(root_folder, window_size=256, batch_size=32, epochs=5, lr test_losses.append(test_loss) print(f"\U0001F535 Epoch {ep+1}, Train Loss: {train_losses[-1]:.4f}, Test Loss: {test_losses[-1]:.4f}") - # Plot loss curves + if early_stopping.check(train_losses[-1], test_losses[-1], model, ep): + break + plt.plot(train_losses, label="Training Loss") plt.plot(test_losses, label="Test Loss", linestyle="dashed") - plt.legend(), plt.grid(), plt.show() + plt.xlabel("Epoch") + plt.ylabel("Loss") + plt.legend() + plt.grid() + plt.show() - # Save model - save_path = os.path.join(root_folder, "step_counter_cnn.pth") - torch.save(model.state_dict(), save_path) - print(f"Model saved: {save_path}") + print(f"The model was saved at epoch {early_stopping.best_epoch + 1}.") return model, test_loader, device @@ -189,142 +231,22 @@ def evaluate_model(model, test_loader, device): plt.show() -def evaluate_full_run_6channels_against_groundtruth_plotly( - model, device, left_csv, right_csv, stepcount_csv, window_size=256, peak_distance=10, peak_height=0.5 -): - """ - Evaluates the full run by comparing the model's step detection against ground truth. - Generates an interactive Plotly graph with acceleration data and detected steps. - - Parameters: - model (torch.nn.Module): Trained step detection model. - device (torch.device): The device (CPU/GPU) for processing. - left_csv (str): Path to CSV file containing left foot acceleration data. - right_csv (str): Path to CSV file containing right foot acceleration data. - stepcount_csv (str): Path to CSV file containing ground truth step data. - window_size (int, optional): Size of the sliding window for processing. Default is 256. - peak_distance (int, optional): Minimum distance between detected peaks (steps). Default is 10. - peak_height (float, optional): Minimum height for peak detection. Default is 0.5. - - Outputs: - - Displays an interactive Plotly graph with acceleration data and step detection results. - - Prints debugging information for CNN predictions. - """ - left_df = pd.read_csv(left_csv) - right_df = pd.read_csv(right_csv) - step_counts = pd.read_csv(stepcount_csv) - - def compute_enmo(data): - """Computes the Euclidean Norm Minus One (ENMO) from accelerometer data.""" - norm = np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) - 1 - return np.maximum(norm, 0) # Set negative values to 0 - - # Compute ENMO for left and right foot acceleration - left_df["ENMO"] = compute_enmo(left_df) - right_df["ENMO"] = compute_enmo(right_df) - - # Create a DataFrame with only ENMO values - combined_df = pd.DataFrame({"ENMO_left": left_df["ENMO"], "ENMO_right": right_df["ENMO"]}) - - def extract_peaks(peaks_str): - """Extracts peak values from a string representation of a list.""" - import ast - - if isinstance(peaks_str, str) and peaks_str.startswith("["): - return ast.literal_eval(peaks_str) - return [] - - # Extract ground truth step frames - left_peaks = extract_peaks(step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values[0]) - right_peaks = extract_peaks(step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values[0]) - groundtruth_frames = set(left_peaks + right_peaks) - - # Normalize data using StandardScaler - sc = StandardScaler() - arr = sc.fit_transform(combined_df.values) # shape=(Frames, 2) - arr = torch.tensor(arr, dtype=torch.float32, device=device) - - model.eval() - N = len(arr) - frame_probs = np.zeros(N, dtype=np.float32) - overlap_cnt = np.zeros(N, dtype=np.float32) - - # Sliding window approach for step detection - with torch.no_grad(): - for start in tqdm(range(N - window_size), desc="FullRunProcessing"): - window_ = arr[start : start + window_size, :].permute(1, 0).unsqueeze(0) - out = model(window_) - frame_probs[start : start + window_size] += out.squeeze(0).cpu().numpy() - overlap_cnt[start : start + window_size] += 1 - - # Normalize probabilities by overlap count - valid = overlap_cnt > 0 - frame_probs[valid] /= overlap_cnt[valid] - - # Detect peaks in model output probabilities - peaks, _ = find_peaks(frame_probs, height=0.02, distance=30, prominence=0.05) - detected_frames = set(peaks.tolist()) - - # Create Plotly figure - fig = go.Figure() - time_axis = np.arange(len(combined_df)) - - for col in combined_df.columns: - fig.add_trace(go.Scatter(x=time_axis, y=combined_df[col], mode="lines", name=col)) - - fig.add_trace( - go.Scatter( - x=list(detected_frames), - y=[frame_probs[i] for i in detected_frames], - mode="markers", - name="Detected Steps", - marker=dict(color="red", size=8), - ) - ) - - fig.add_trace( - go.Scatter( - x=list(groundtruth_frames), - y=[frame_probs[i] for i in groundtruth_frames], - mode="markers", - name="Ground Truth Steps", - marker=dict(color="green", symbol="x", size=8), - ) - ) - - fig.update_layout( - title="Accelerometer Data with Step Detection", - xaxis_title="Frame", - yaxis_title="Acceleration / Probability", - legend_title="Legend", - template="plotly_white", - ) - - fig.show() - - print("\n==== Debugging CNN Predictions ====") - print("Frame probabilities (first 20 values):", frame_probs[:20]) - print("Max probability from CNN:", np.max(frame_probs)) - print("Mean probability from CNN:", np.mean(frame_probs)) - - def main(): root_folder = "D:\Step-counter\Output" window_size = 64 - batch_size = 32 - epochs = 5 + batch_size = 128 + epochs = 20 - model, test_loader, device = train_step_counter(root_folder, window_size, batch_size, epochs) + model, test_loader, device = train_step_counter(root_folder, window_size, batch_size, epochs, 1e-3) if model is not None and test_loader is not None: evaluate_model(model, test_loader, device) - left_csv = "D:\Step-counter\Output\GX010029\GX010029_left_acceleration_data.csv" - right_csv = "D:\Step-counter\Output\GX010029\GX010029_right_acceleration_data.csv" - stepcount_csv = "D:\Step-counter\Output\GX010029\scaled_step_counts.csv" + model_path = "best_model.pth" + left_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/005_left_acceleration_data.csv" + right_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/005_right_acceleration_data.csv" + stepcount_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/scaled_step_counts.csv" - evaluate_full_run_6channels_against_groundtruth_plotly( - model, device, left_csv, right_csv, stepcount_csv, window_size=window_size, peak_distance=10, peak_height=0.4 - ) + prediction(model_path, left_csv, right_csv, stepcount_csv) if __name__ == "__main__": diff --git a/src/generator.py b/src/generator.py index 096a8b4..32bb502 100644 --- a/src/generator.py +++ b/src/generator.py @@ -1,29 +1,7 @@ import os import csv from tqdm import tqdm - - -def is_video_in_csv(csv_file, video_filename): - """ - Checks if a video's metadata is already in the CSV file. - - Parameters: - csv_file (str): Path to the CSV file. - video_filename (str): Name of the video file. - - Returns: - bool: True if the video is already in the CSV, False otherwise. - """ - if not os.path.exists(csv_file): - # If the CSV file does not exist, treat it as empty - return False - - with open(csv_file, "r") as file: - reader = csv.DictReader(file) - for row in reader: - if row["filename"] == video_filename: - return True - return False +from metadata_handler import is_video_in_csv # Import the function from metadata_handler.py def count_videos_in_directory(root_dir): @@ -82,7 +60,7 @@ def video_file_generator(root_dir, metadata_csv): for video_path in video_files: video_filename = os.path.basename(video_path) - # Check if the video is already in the CSV + # Check if the video is already in the CSV (using imported function) if is_video_in_csv(metadata_csv, video_filename): print(f"Skipping: {video_filename} (already in metadata).") skipped += 1 diff --git a/src/main.py b/src/main.py index 664a017..c4f3284 100644 --- a/src/main.py +++ b/src/main.py @@ -108,4 +108,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/metadata_handler.py b/src/metadata_handler.py index 9816a25..57dbcf3 100644 --- a/src/metadata_handler.py +++ b/src/metadata_handler.py @@ -126,4 +126,4 @@ def print_summary(): for item in summary["non_matches"]: print( f" - {item['filename']}: Manual Steps = {item['manual_steps']}, Calculated Steps = {item['calculated_steps']}" - ) \ No newline at end of file + ) diff --git a/src/peak_picker.py b/src/peak_picker.py index 4bc7cf7..3d9b566 100644 --- a/src/peak_picker.py +++ b/src/peak_picker.py @@ -9,41 +9,46 @@ def load_accelerometer_data(file_path): """Loads accelerometer data from CSV.""" return pd.read_csv(file_path) + # Normalize accelerometer data def normalize_accelerometer_data(data): """Computes ENMO (Euclidean Norm Minus One) for accelerometer data.""" norm = np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) - 1 return np.maximum(norm, 0) # Negative values set to zero + # Detect peaks in normalized data def detect_peaks(normalized_data, distance=50, prominence=0.5): """Detects step peaks in the normalized accelerometer signal.""" peaks, _ = find_peaks(normalized_data, distance=distance, prominence=prominence) return peaks + # Load ground truth data def load_ground_truth(file_path): """Loads total step count from ground truth CSV.""" ground_truth = pd.read_csv(file_path) total_row = ground_truth[ground_truth["Joint"] == "Total"] - + if total_row.empty: raise ValueError("The ground truth file does not have a 'Total' row.") total_steps = int(total_row.iloc[0]["Detected Steps"]) return total_steps + # Compare detected steps to ground truth def compare_with_ground_truth(detected_steps, ground_truth_steps, foot="both"): """Compares detected peaks with ground truth steps.""" print(f"{foot.capitalize()} Foot - Detected Steps: {detected_steps}") print(f"Ground Truth Steps: {ground_truth_steps}") - + accuracy = (detected_steps / ground_truth_steps) * 100 print(f"Accuracy: {accuracy:.2f}%") - + return accuracy + # Process all folders def process_folders(root_dir): """Processes all video session folders.""" @@ -93,7 +98,7 @@ def process_folders(root_dir): plt.figure(figsize=(12, 6)) plt.plot(left_norm, label="Left Foot ENMO", color="blue", alpha=0.7) plt.plot(right_norm, label="Right Foot ENMO", color="red", alpha=0.7) - + plt.scatter(left_peaks, left_norm[left_peaks], color="green", label="Left Foot Peaks", marker="x") plt.scatter(right_peaks, right_norm[right_peaks], color="orange", label="Right Foot Peaks", marker="o") @@ -103,10 +108,12 @@ def process_folders(root_dir): plt.legend() plt.show() + # Main function def main(): root_dir = "D:\\Step-counter\\Output" # Adjust path to dataset directory process_folders(root_dir) + if __name__ == "__main__": main() diff --git a/src/video_processing.py b/src/video_processing.py index d15927b..c3fefae 100644 --- a/src/video_processing.py +++ b/src/video_processing.py @@ -273,4 +273,4 @@ def save_step_data_to_csv(output_folder, joints_data, smoothed_data, peaks_data, writer.writerow(["right_foot_index", right_steps, list(peaks_data.get("right_foot_index", []))]) writer.writerow(["Total", total_steps, ""]) - print(f"Step counts saved: {steps_csv}") \ No newline at end of file + print(f"Step counts saved: {steps_csv}")