diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 1ae89ed..0205a35 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -30,12 +30,12 @@ jobs: run: | choco install visualstudio2019buildtools --package-parameters "--add Microsoft.VisualStudio.Workload.VCTools" choco install cmake - pip install -e . # Install the project in editable mode + pip install -r requirements.txt pip install matplotlib --prefer-binary - name: Install dependencies on other OSes if: matrix.os != 'windows-latest' - run: pip install -e . + run: pip install -r requirements.txt - name: Lint with ruff run: | diff --git a/cnn/data_generator.py b/cnn/data_generator.py index e342036..10ee0be 100644 --- a/cnn/data_generator.py +++ b/cnn/data_generator.py @@ -7,31 +7,46 @@ def compute_enmo(data): - # Calculate the ENMO value for the data + """ + Compute the ENMO value for accelerometer data. + + Parameters: + data (DataFrame): Data with 'X', 'Y', and 'Z' columns. + + Returns: + ndarray: ENMO values with negatives set to 0. + """ norm = np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) - 1 - return np.maximum(norm, 0) # Set negative values to 0 + return np.maximum(norm, 0) class StepCounterDataset(Dataset): + """ + Dataset for step counting using left and right accelerometer data. + """ def __init__(self, left_data, right_data, step_counts, window_size): - self.window_size = window_size # Ensure window_size is assigned + """ + Initialize the dataset by computing ENMO, differences, and step labels. + + Parameters: + left_data (DataFrame): Left foot accelerometer data. + right_data (DataFrame): Right foot accelerometer data. + step_counts (DataFrame): CSV data containing step peaks. + window_size (int): Size of the data window. + """ + self.window_size = window_size - # Calculate ENMO for both feet left_data["ENMO"] = compute_enmo(left_data) right_data["ENMO"] = compute_enmo(right_data) - # Calculate the difference in ENMO values left_data["ENMO_DIFF"] = left_data["ENMO"].diff().fillna(0) right_data["ENMO_DIFF"] = right_data["ENMO"].diff().fillna(0) - # Stack the ENMO differences for both feet self.data = np.hstack((left_data[["ENMO_DIFF"]], right_data[["ENMO_DIFF"]])) - # Normalize the data self.scaler = StandardScaler() self.data = self.scaler.fit_transform(self.data) - # Extract step labels def extract_peaks(peaks_str): if isinstance(peaks_str, str): try: @@ -40,76 +55,77 @@ def extract_peaks(peaks_str): return [] return [] - # Extract peaks for left and right feet left_peaks = extract_peaks(step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values[0]) right_peaks = extract_peaks(step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values[0]) - # Create step labels self.step_labels = np.zeros(len(self.data), dtype=np.float32) - - # Shift step labels to improve peak positions for CNN for p in left_peaks + right_peaks: if 0 <= p < len(self.step_labels) - (window_size // 2): self.step_labels[p + (window_size // 2)] = 1 - # Debugging information print("\n==== Debugging Step Extraction ====") print("Step data (first few rows):") print(step_counts.head()) - print("\nExtraction of peaks for the left foot:") - print("Raw data from CSV:", step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values) + print("Raw data:", step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values) print("Extracted peaks:", left_peaks) - print("\nExtraction of peaks for the right foot:") - print("Raw data from CSV:", step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values) + print("Raw data:", step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values) print("Extracted peaks:", right_peaks) - print("\nTotal peaks found: Left =", len(left_peaks), ", Right =", len(right_peaks)) print("==================================\n") def __len__(self): + """ + Return the number of data segments. + """ return len(self.data) - self.window_size def __getitem__(self, idx): + """ + Get a data segment and its step labels with added noise. + + Parameters: + idx (int): Starting index of the segment. + + Returns: + tuple: (augmented data segment, corresponding labels). + """ x = self.data[idx : idx + self.window_size] y = self.step_labels[idx : idx + self.window_size] - - # Data augmentation: Add slight noise to the data noise = np.random.normal(0, 0.02, x.shape) x_augmented = x + noise - return x_augmented, y def load_datasets(folder_path, window_size, batch_size): """ - Reads the following files: - (Folder name)_left_acceleration_data.csv, - (Folder name)_right_acceleration_data.csv, - scaled_step_counts.csv - and creates a DataLoader with segments. + Load accelerometer and step count CSV files from a folder and create a DataLoader. + + Parameters: + folder_path (str): Path to the folder containing the CSV files. + window_size (int): Size of each data window. + batch_size (int): Batch size for the DataLoader. + + Returns: + DataLoader: DataLoader for the dataset, or None if files are missing/empty. """ folder_name = os.path.basename(folder_path) left_file = os.path.join(folder_path, f"{folder_name}_left_acceleration_data.csv") right_file = os.path.join(folder_path, f"{folder_name}_right_acceleration_data.csv") step_file = os.path.join(folder_path, "scaled_step_counts.csv") - # Check if all required files exist if not (os.path.exists(left_file) and os.path.exists(right_file) and os.path.exists(step_file)): print(f"Folder {folder_name}: Missing files, skipping.") return None - # Load data from CSV files left_data = pd.read_csv(left_file) right_data = pd.read_csv(right_file) step_counts = pd.read_csv(step_file) - # Check if any of the dataframes are empty if left_data.empty or right_data.empty or step_counts.empty: print(f"Folder {folder_name}: Empty data, skipping.") return None - # Create dataset and DataLoader dataset = StepCounterDataset(left_data, right_data, step_counts, window_size) return DataLoader(dataset, batch_size=batch_size, shuffle=True) diff --git a/cnn/prediction.py b/cnn/prediction.py index 2108d4f..37f6a33 100644 --- a/cnn/prediction.py +++ b/cnn/prediction.py @@ -9,7 +9,17 @@ def load_model(model_path, device, window_size=64): - """Loads the trained model.""" + """ + Load a pre-trained StepCounterCNN model. + + Parameters: + model_path (str): Path to the saved model weights. + device (torch.device): Device to load the model onto. + window_size (int, optional): Input window size for the model. Defaults to 64. + + Returns: + StepCounterCNN: The loaded and evaluated model. + """ model = StepCounterCNN(window_size) model.load_state_dict(torch.load(model_path, map_location=device)) model.to(device) @@ -18,13 +28,36 @@ def load_model(model_path, device, window_size=64): def compute_enmo(data): - """Computes the Euclidean Norm Minus One (ENMO) from accelerometer data.""" + """ + Compute the Euclidean Norm Minus One (ENMO) for accelerometer data. + + This function calculates the Euclidean norm of the 'X', 'Y', and 'Z' columns, + subtracts 1 from the result, and returns the maximum between the computed value and 0. + + Parameters: + data (DataFrame): Accelerometer data with 'X', 'Y', and 'Z' columns. + + Returns: + np.ndarray: Array of ENMO values. + """ norm = np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) - 1 return np.maximum(norm, 0) def process_data(left_csv, right_csv): - """Loads and processes acceleration data from left and right foot CSV files.""" + """ + Load and process accelerometer data from left and right CSV files. + + Reads the CSV files for both left and right foot data, computes the ENMO for each, + and returns a DataFrame combining the results. + + Parameters: + left_csv (str): Path to the left foot accelerometer CSV. + right_csv (str): Path to the right foot accelerometer CSV. + + Returns: + DataFrame: A DataFrame with columns 'ENMO_left' and 'ENMO_right'. + """ left_df = pd.read_csv(left_csv) right_df = pd.read_csv(right_csv) @@ -32,7 +65,21 @@ def process_data(left_csv, right_csv): def detect_steps(model, device, data, window_size=64): - """Runs the step detection model on the given data.""" + """ + Run the step detection model on processed accelerometer data. + + The function scales the data using a StandardScaler, applies the model over sliding windows, + aggregates the output probabilities, and identifies step peaks using a threshold. + + Parameters: + model (StepCounterCNN): The loaded step detection model. + device (torch.device): Device for computation. + data (DataFrame): Processed accelerometer data. + window_size (int, optional): Size of the sliding window. Defaults to 64. + + Returns: + ndarray: Indices of detected step peaks. + """ data = torch.tensor(StandardScaler().fit_transform(data), dtype=torch.float32, device=device) frame_probs = np.zeros(len(data), dtype=np.float32) overlap_cnt = np.zeros(len(data), dtype=np.float32) @@ -48,7 +95,18 @@ def detect_steps(model, device, data, window_size=64): def parse_groundtruth_steps(groundtruth_csv): - """Parses the ground truth step data from CSV.""" + """ + Parse ground truth step data from a CSV file. + + Reads the ground truth CSV, extracts the 'Peaks' column from the first two rows, + evaluates the string representations, and returns a set of ground truth step indices. + + Parameters: + groundtruth_csv (str): Path to the ground truth CSV file. + + Returns: + set: A set containing ground truth step indices. + """ groundtruth_df = pd.read_csv(groundtruth_csv, nrows=2) # Only consider the first two rows steps = set() for peak_str in groundtruth_df["Peaks"].dropna(): @@ -60,7 +118,17 @@ def parse_groundtruth_steps(groundtruth_csv): def plot_results(data, detected_steps, groundtruth_steps): - """Generates an interactive Plotly visualization of acceleration data, detected steps, and ground truth.""" + """ + Create an interactive Plotly visualization of acceleration data and step detections. + + Plots the acceleration signals for each channel, overlays markers for detected steps and + ground truth steps, and displays the interactive figure. + + Parameters: + data (DataFrame): Combined accelerometer data (e.g., 'ENMO_left' and 'ENMO_right'). + detected_steps (ndarray): Indices of steps detected by the model. + groundtruth_steps (set): Set of ground truth step indices. + """ fig = go.Figure() time_axis = np.arange(len(data)) @@ -102,7 +170,18 @@ def plot_results(data, detected_steps, groundtruth_steps): def main(model_path, left_csv, right_csv, groundtruth_csv): - """Runs the full step detection pipeline and visualization.""" + """ + Execute the full step detection pipeline and visualization. + + Loads the trained model, processes accelerometer data from left and right CSV files, + runs the step detection, parses ground truth step data, and visualizes the results. + + Parameters: + model_path (str): Path to the saved model weights. + left_csv (str): Path to the left foot accelerometer CSV. + right_csv (str): Path to the right foot accelerometer CSV. + groundtruth_csv (str): Path to the ground truth CSV file. + """ device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = load_model(model_path, device) data = process_data(left_csv, right_csv) @@ -119,3 +198,4 @@ def main(model_path, left_csv, right_csv, groundtruth_csv): groundtruth_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/scaled_step_counts.csv" main(model_path, left_csv, right_csv, groundtruth_csv) + diff --git a/src/clap_detection_methods.py b/src/clap_detection_methods.py index 9f4d6a5..9fdee37 100644 --- a/src/clap_detection_methods.py +++ b/src/clap_detection_methods.py @@ -5,10 +5,18 @@ from moviepy import VideoFileClip -# Method to plot the audio waveform of a video file +def plot_audio_waveform_from_array(audio_array, fps, num_segments=4): + """ + Plot the audio waveform from an array while highlighting the first and last segments. + Converts stereo audio to mono if needed, creates a time axis from fps, and shades the + first and last segments based on the total number of segments. -def plot_audio_waveform_from_array(audio_array, fps, num_segments=4): + Parameters: + audio_array (np.ndarray): Input audio data. + fps (float): Sampling rate (frames per second). + num_segments (int): Number of segments to split the audio into. + """ # If stereo, convert to mono by averaging channels if audio_array.ndim > 1: audio_data = np.mean(audio_array, axis=1) @@ -16,24 +24,21 @@ def plot_audio_waveform_from_array(audio_array, fps, num_segments=4): audio_data = audio_array audio_data = np.abs(audio_data) - duration = len(audio_data) / fps time_axis = np.linspace(0, duration, num=len(audio_data)) plt.figure(figsize=(12, 4)) plt.plot(time_axis, audio_data, color="steelblue", label="Waveform") - # Calculate segment length + # Calculate segment length and define segment boundaries segment_length = len(audio_data) // num_segments - first_segment_start_time = 0.0 first_segment_end_time = segment_length / fps last_segment_start_time = (num_segments - 1) * segment_length / fps last_segment_end_time = duration - # Color the first and last segments + # Highlight first and last segments plt.axvspan(first_segment_start_time, first_segment_end_time, color="red", alpha=0.15, label="Erstes Segment") - plt.axvspan(last_segment_start_time, last_segment_end_time, color="green", alpha=0.15, label="Letztes Segment") plt.title("Audio Waveform with First & Last Segment Shaded") @@ -45,19 +50,28 @@ def plot_audio_waveform_from_array(audio_array, fps, num_segments=4): plt.show() -# Method to detect Claps using segmentation (first and last segments only) +def detect_claps_first_last_segments(audio_array, fps, num_segments): + """ + Detect claps by locating the peak amplitude in the first and last segments. + Converts stereo audio to mono, divides the audio into segments, and finds the maximum + amplitude (assumed to be a clap) in the first and last segments. -def detect_claps_first_last_segments(audio_array, fps, num_segments): + Parameters: + audio_array (np.ndarray): Input audio data. + fps (float): Sampling rate (frames per second). + num_segments (int): Number of segments to split the audio into. - # convert stereo to mono + Returns: + list: Tuples containing (clap time in seconds, frame index) for each detected clap. + """ + # Convert stereo to mono audio_data = np.mean(audio_array, axis=1) - abs_audio = np.abs(audio_data) segment_length = len(abs_audio) // num_segments claps = [] - # Process only the first and last segments + # Only process the first and last segments segments_to_process = [0, num_segments - 1] for segment_index in segments_to_process: start_idx = segment_index * segment_length @@ -65,26 +79,34 @@ def detect_claps_first_last_segments(audio_array, fps, num_segments): segment = abs_audio[start_idx:end_idx] if len(segment) > 0: - # Find the maximum value in the segment max_idx = segment.argmax() - clap_time = (start_idx + max_idx) / float(fps) claps.append((clap_time, start_idx + max_idx)) return claps -# Method to process videos in a directory +def process_videos_in_directory(directory_path, num_segments): + """ + Process video files by plotting their audio waveforms and detecting claps. + Iterates through .mp4 files in the specified directory. For each video, it extracts the + audio track, plots the waveform, detects claps in the first and last segments, and records + the results. -def process_videos_in_directory(directory_path, num_segments): + Parameters: + directory_path (str): Path to the folder containing video files. + num_segments (int): Number of segments to split the audio for clap detection. + + Returns: + list: Dictionaries with video filename, detected clap times, frame indices, and duration between claps. + """ video_extension = ".mp4" clap_results = [] for filename in os.listdir(directory_path): file_path = os.path.join(directory_path, filename) if os.path.isfile(file_path) and os.path.splitext(filename.lower())[1] == video_extension: - print(f"\nProcessing video: {filename}") try: @@ -99,7 +121,7 @@ def process_videos_in_directory(directory_path, num_segments): plot_audio_waveform_from_array(audio_array, fps) - # Now detect claps + # Detect claps in first and last segments claps = detect_claps_first_last_segments(audio_array, fps=fps, num_segments=num_segments) if claps: @@ -107,17 +129,14 @@ def process_videos_in_directory(directory_path, num_segments): print(f" Clap detected at {c_time:.2f} seconds (frame {frame})") duration_between_claps = claps[1][0] - claps[0][0] - - clap_results.append( - { - "Filename": filename, - "Start Clap Seconds": claps[0][0], - "End Clap Seconds": claps[1][0], - "Start Clap Frame": claps[0][1], - "End Clap Frame": claps[1][1], - "Duration Between Claps": duration_between_claps, - } - ) + clap_results.append({ + "Filename": filename, + "Start Clap Seconds": claps[0][0], + "End Clap Seconds": claps[1][0], + "Start Clap Frame": claps[0][1], + "End Clap Frame": claps[1][1], + "Duration Between Claps": duration_between_claps, + }) except Exception as e: print(f" Error processing {filename}: {e}") @@ -126,9 +145,20 @@ def process_videos_in_directory(directory_path, num_segments): def load_accelerometer_data(file_path, sampling_frequency=256): - data = pd.read_csv(file_path, delimiter=",", skiprows=11, names=["X", "Y", "Z"], dtype=str) + """ + Load accelerometer data from a CSV file and create a time axis. - # Convert to floats + Reads the CSV (skipping the first 11 rows), converts the data to floats (adjusting for commas), + and generates a time axis based on the sampling frequency. + + Parameters: + file_path (str): Path to the accelerometer CSV file. + sampling_frequency (int): Sampling rate for the accelerometer data. + + Returns: + tuple: (DataFrame with accelerometer data, time axis as a Series) + """ + data = pd.read_csv(file_path, delimiter=",", skiprows=11, names=["X", "Y", "Z"], dtype=str) data = data.map(lambda x: x.replace(",", ".")).astype(float) data.reset_index(drop=True, inplace=True) time_seconds = data.index / sampling_frequency @@ -136,16 +166,34 @@ def load_accelerometer_data(file_path, sampling_frequency=256): return data, time_seconds -# Function to normalize data def normalize_data(data): + """ + Compute the Euclidean norm of accelerometer data from X, Y, and Z columns. + + Parameters: + data (DataFrame): Accelerometer data with columns "X", "Y", and "Z". + + Returns: + np.ndarray: Array of normalized acceleration values. + """ return np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) -# Function to plot data for a specific time interval -def plot_accelerometer_data_interval( - cleaned_data, time_seconds, start_time=None, end_time=None, title_suffix="Full Duration", plot_each_axis=False -): +def plot_accelerometer_data_interval(cleaned_data, time_seconds, start_time=None, end_time=None, title_suffix="Full Duration", plot_each_axis=False): + """ + Plot accelerometer data over a specified time interval. + + Can either plot individual X, Y, and Z axes or plot the normalized data. The displayed + time interval is controlled by start_time and end_time. + Parameters: + cleaned_data (DataFrame): Accelerometer data. + time_seconds (array-like): Corresponding time values. + start_time (float, optional): Start time for plotting. + end_time (float, optional): End time for plotting. + title_suffix (str): Text to append to the plot title. + plot_each_axis (bool): If True, plot each axis separately. + """ if start_time is not None or end_time is not None: mask = (time_seconds >= (start_time or time_seconds.min())) & (time_seconds <= (end_time or time_seconds.max())) filtered_data = cleaned_data[mask] @@ -155,7 +203,6 @@ def plot_accelerometer_data_interval( filtered_time = time_seconds if plot_each_axis: - # Plot each axis plt.figure(figsize=(15, 10)) for i, (axis, color) in enumerate(zip(["X", "Y", "Z"], ["blue", "green", "red"]), 1): plt.subplot(3, 1, i) @@ -168,7 +215,6 @@ def plot_accelerometer_data_interval( plt.tight_layout() plt.show() - # Plot normalized data normalized_data = normalize_data(filtered_data) plt.figure(figsize=(15, 5)) plt.plot(filtered_time, normalized_data, label="Norm (X, Y, Z)", color="purple") @@ -179,9 +225,21 @@ def plot_accelerometer_data_interval( plt.show() -# Function to find peaks in the normalized data def find_peaks_in_interval(normalized_data, time_seconds, start_time=None, end_time=None): - # Filter data by interval + """ + Find peaks in normalized accelerometer data within a specified interval. + + The function splits the data into two halves and identifies the peak (maximum value) in each half. + + Parameters: + normalized_data (Series or ndarray): Normalized accelerometer values. + time_seconds (array-like): Corresponding time values. + start_time (float, optional): Start of the interval. + end_time (float, optional): End of the interval. + + Returns: + tuple: (First half peak time, second half peak time, first half index, second half index) + """ if start_time is not None or end_time is not None: mask = (time_seconds >= (start_time or time_seconds.min())) & (time_seconds <= (end_time or time_seconds.max())) filtered_data = normalized_data[mask] @@ -190,61 +248,51 @@ def find_peaks_in_interval(normalized_data, time_seconds, start_time=None, end_t filtered_data = normalized_data filtered_time = time_seconds - # Ensure filtered_time is a Series for indexing filtered_time = pd.Series(filtered_time.values, index=filtered_data.index) - - # Split the interval into two halves mid_point = len(filtered_data) // 2 first_half_data = filtered_data.iloc[:mid_point] second_half_data = filtered_data.iloc[mid_point:] - first_half_time = filtered_time.iloc[:mid_point] second_half_time = filtered_time.iloc[mid_point:] index_first_clap = first_half_data.idxmax() index_last_clap = second_half_data.idxmax() - - # Find the maxima in each half first_half_max_time = first_half_time[index_first_clap] second_half_max_time = second_half_time[index_last_clap] - return first_half_max_time, second_half_max_time, index_first_clap, index_last_clap -# Function to slice and save accelerometer data +def slice_accelerometer_data(metadata_csv, raw_accel_data_dir, output_root): + """ + Slice accelerometer data based on metadata and save the slices to CSV files. + For each entry in the metadata CSV, loads the corresponding accelerometer data, + slices it between the provided start and end indices, and writes the sliced data + to an output directory. -def slice_accelerometer_data(metadata_csv, raw_accel_data_dir, output_root): + Parameters: + metadata_csv (str): Path to the metadata CSV. + raw_accel_data_dir (str): Directory with raw accelerometer CSV files. + output_root (str): Root directory where sliced data will be saved. + Returns: + list: A list of dictionaries with slice information per video. + """ df = pd.read_csv(metadata_csv) - for _, row in df.iterrows(): video_id = row["video_id"] watch_filename = row["watch"] start_idx = int(row["index_start_clap"]) end_idx = int(row["index_end_clap"]) - - # Build the full path to the raw accelerometer file acc_data_path = os.path.join(raw_accel_data_dir, f"{watch_filename}.csv") - if not os.path.exists(acc_data_path): print(f"File not found: {acc_data_path}") continue - - # Load and clean the full accelerometer data cleaned_data, _ = load_accelerometer_data(acc_data_path) - - # Slice the data from start_idx to end_idx sliced_data = cleaned_data.loc[start_idx:end_idx] - - # Create output folder for this video video_folder = os.path.join(output_root, video_id) os.makedirs(video_folder, exist_ok=True) - - # Define the output filename output_filename = f"{video_id}_acceleration_data.csv" output_path = os.path.join(video_folder, output_filename) - - # Save sliced data sliced_data.to_csv(output_path, index=False) print(f"Sliced data saved to: {output_path}") diff --git a/src/peak_picker.py b/src/peak_picker.py index 3d9b566..d775b55 100644 --- a/src/peak_picker.py +++ b/src/peak_picker.py @@ -4,29 +4,72 @@ from scipy.signal import find_peaks import matplotlib.pyplot as plt -# Load accelerometer data + def load_accelerometer_data(file_path): - """Loads accelerometer data from CSV.""" + """ + Load accelerometer data from a CSV file. + + Parameters: + file_path (str): Path to the CSV file containing accelerometer data. + + Returns: + DataFrame: The loaded accelerometer data. + """ return pd.read_csv(file_path) -# Normalize accelerometer data def normalize_accelerometer_data(data): - """Computes ENMO (Euclidean Norm Minus One) for accelerometer data.""" + """ + Compute the ENMO (Euclidean Norm Minus One) for accelerometer data. + + This function calculates the Euclidean norm of the X, Y, and Z columns, + subtracts 1 from the result, and sets any negative values to zero. + + Parameters: + data (DataFrame): Accelerometer data with columns "X", "Y", and "Z". + + Returns: + np.ndarray: The normalized acceleration values. + """ norm = np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) - 1 - return np.maximum(norm, 0) # Negative values set to zero + return np.maximum(norm, 0) -# Detect peaks in normalized data def detect_peaks(normalized_data, distance=50, prominence=0.5): - """Detects step peaks in the normalized accelerometer signal.""" + """ + Detect peaks in the normalized accelerometer data. + + Uses the scipy.signal.find_peaks function to locate peaks based on the + provided distance and prominence thresholds. + + Parameters: + normalized_data (array-like): Normalized accelerometer signal. + distance (int): Minimum distance between peaks (default is 50 samples). + prominence (float): Required prominence of peaks (default is 0.5). + + Returns: + ndarray: Indices of the detected peaks. + """ peaks, _ = find_peaks(normalized_data, distance=distance, prominence=prominence) return peaks -# Load ground truth data def load_ground_truth(file_path): - """Loads total step count from ground truth CSV.""" + """ + Load the total step count from a ground truth CSV file. + + Expects a row labeled "Total" in the "Joint" column that contains the total + detected steps. + + Parameters: + file_path (str): Path to the ground truth CSV file. + + Returns: + int: The total number of detected steps. + + Raises: + ValueError: If the "Total" row is not found in the CSV. + """ ground_truth = pd.read_csv(file_path) total_row = ground_truth[ground_truth["Joint"] == "Total"] @@ -37,21 +80,43 @@ def load_ground_truth(file_path): return total_steps -# Compare detected steps to ground truth def compare_with_ground_truth(detected_steps, ground_truth_steps, foot="both"): - """Compares detected peaks with ground truth steps.""" + """ + Compare the detected step count with the ground truth. + + Prints the number of detected steps, ground truth steps, and calculates the + accuracy percentage. + + Parameters: + detected_steps (int): The number of steps detected. + ground_truth_steps (int): The ground truth step count. + foot (str): Indicates which foot's data is being compared ("left", "right", or "both"). + + Returns: + float: The calculated accuracy as a percentage. + """ print(f"{foot.capitalize()} Foot - Detected Steps: {detected_steps}") print(f"Ground Truth Steps: {ground_truth_steps}") accuracy = (detected_steps / ground_truth_steps) * 100 print(f"Accuracy: {accuracy:.2f}%") - return accuracy -# Process all folders def process_folders(root_dir): - """Processes all video session folders.""" + """ + Process all session folders in the root directory for step detection. + + For each folder, this function loads accelerometer data from left and right foot, + normalizes the data, detects peaks, compares the detected step count with the ground + truth, and plots the results. + + Parameters: + root_dir (str): The root directory containing session folders. + + Returns: + None + """ for folder_name in os.listdir(root_dir): folder_path = os.path.join(root_dir, folder_name) if not os.path.isdir(folder_path): # Skip non-directories @@ -59,12 +124,12 @@ def process_folders(root_dir): print(f"\nProcessing folder: {folder_name}") - # Paths to accelerometer data (left & right) and ground truth + # Construct file paths left_accel_file = os.path.join(folder_path, f"{folder_name}_left_acceleration_data.csv") right_accel_file = os.path.join(folder_path, f"{folder_name}_right_acceleration_data.csv") ground_truth_file = os.path.join(folder_path, "scaled_step_counts.csv") - # Check if files exist + # Ensure required files exist if not os.path.exists(left_accel_file) or not os.path.exists(right_accel_file): print(f"Missing accelerometer files in {folder_name}. Skipping.") continue @@ -72,13 +137,12 @@ def process_folders(root_dir): print(f"Missing ground truth file in {folder_name}. Skipping.") continue - # Load and process left & right foot accelerometer data + # Load and process data left_data = load_accelerometer_data(left_accel_file) right_data = load_accelerometer_data(right_accel_file) left_norm = normalize_accelerometer_data(left_data) right_norm = normalize_accelerometer_data(right_data) - # Detect peaks for both feet left_peaks = detect_peaks(left_norm) right_peaks = detect_peaks(right_norm) @@ -86,19 +150,16 @@ def process_folders(root_dir): detected_steps_right = len(right_peaks) detected_steps_total = detected_steps_left + detected_steps_right - # Load ground truth ground_truth_steps = load_ground_truth(ground_truth_file) - # Compare results compare_with_ground_truth(detected_steps_left, ground_truth_steps // 2, "left") compare_with_ground_truth(detected_steps_right, ground_truth_steps // 2, "right") compare_with_ground_truth(detected_steps_total, ground_truth_steps, "both") - # Plot results + # Plot the accelerometer signals and detected peaks plt.figure(figsize=(12, 6)) plt.plot(left_norm, label="Left Foot ENMO", color="blue", alpha=0.7) plt.plot(right_norm, label="Right Foot ENMO", color="red", alpha=0.7) - plt.scatter(left_peaks, left_norm[left_peaks], color="green", label="Left Foot Peaks", marker="x") plt.scatter(right_peaks, right_norm[right_peaks], color="orange", label="Right Foot Peaks", marker="o") @@ -109,11 +170,17 @@ def process_folders(root_dir): plt.show() -# Main function def main(): + """ + Main function to process session folders for step detection. + + Sets the root directory path for the dataset and initiates processing of all + session folders. + """ root_dir = "D:\\Step-counter\\Output" # Adjust path to dataset directory process_folders(root_dir) if __name__ == "__main__": main() + diff --git a/src/save_acc_metadata_sliced.py b/src/save_acc_metadata_sliced.py index 403e5c5..4677075 100644 --- a/src/save_acc_metadata_sliced.py +++ b/src/save_acc_metadata_sliced.py @@ -3,11 +3,23 @@ import numpy as np from clap_detection_methods import load_accelerometer_data -# Function to slice and save accelerometer data - def slice_accelerometer_data(metadata_csv, raw_accel_data_dir, output_root): + """ + Slice accelerometer data for each video and save the slices to CSV files. + + Reads a metadata CSV containing video IDs, filenames for left/right watches, + and start/end indices. For each watch file, loads the accelerometer data, + slices it, saves the slice, and records the slice length for later scaling. + + Parameters: + metadata_csv (str): Path to the metadata CSV. + raw_accel_data_dir (str): Directory with raw accelerometer CSV files. + output_root (str): Directory where sliced data will be saved. + Returns: + list: Tuples of (length of sliced data, video folder path) for scaling purposes. + """ df = pd.read_csv(metadata_csv) scaling_data = [] @@ -25,23 +37,14 @@ def slice_accelerometer_data(metadata_csv, raw_accel_data_dir, output_root): print(f"File not found: {acc_data_path}") continue - # Load and clean the full accelerometer data cleaned_data, _ = load_accelerometer_data(acc_data_path) - - # Slice the data from start_idx to end_idx sliced_data = cleaned_data.loc[start_idx:end_idx] - - # Create output folder for this video video_folder = os.path.join(output_root, video_id) os.makedirs(video_folder, exist_ok=True) - - # Define the output filename output_filename = f"{video_id}_{side}_acceleration_data.csv" output_path = os.path.join(video_folder, output_filename) scaling_data.append((len(sliced_data), video_folder)) - - # Save sliced data sliced_data.to_csv(output_path, index=False) print(f"Sliced data for {side} watch saved to: {output_path}") @@ -49,6 +52,16 @@ def slice_accelerometer_data(metadata_csv, raw_accel_data_dir, output_root): def scale_stepcounts_data(scaling_data): + """ + Scale step counts based on the length of the sliced accelerometer data. + + For each video folder, reads the step count data and raw video data to compute a + scaling factor. Then updates the "Peaks" values in the step counts by applying this factor, + and saves the scaled data. + + Parameters: + scaling_data (list): List of tuples (length of sliced data, video folder path). + """ for len_acc_data, video_folder in scaling_data: data_path = os.path.join(video_folder, "step_counts.csv") data = pd.read_csv(data_path)