Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added cnn/best_model.pth
Binary file not shown.
44 changes: 29 additions & 15 deletions cnn/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@


def compute_enmo(data):
# calculate ENMNO for data
# Calculate the ENMO value for the data
norm = np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) - 1
return np.maximum(norm, 0) # Negative Werte auf 0 setzen
return np.maximum(norm, 0) # Set negative values to 0


class StepCounterDataset(Dataset):
Expand All @@ -20,13 +20,14 @@ def __init__(self, left_data, right_data, step_counts, window_size):
left_data["ENMO"] = compute_enmo(left_data)
right_data["ENMO"] = compute_enmo(right_data)

# Calculate the difference in ENMO values
left_data["ENMO_DIFF"] = left_data["ENMO"].diff().fillna(0)
right_data["ENMO_DIFF"] = right_data["ENMO"].diff().fillna(0)

# ENMO compare for data
# Stack the ENMO differences for both feet
self.data = np.hstack((left_data[["ENMO_DIFF"]], right_data[["ENMO_DIFF"]]))

# Normalize data
# Normalize the data
self.scaler = StandardScaler()
self.data = self.scaler.fit_transform(self.data)

Expand All @@ -39,26 +40,29 @@ def extract_peaks(peaks_str):
return []
return []

# Extract peaks for left and right feet
left_peaks = extract_peaks(step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values[0])
right_peaks = extract_peaks(step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values[0])

# Create labels
# Create step labels
self.step_labels = np.zeros(len(self.data), dtype=np.float32)

# Shift step labels so CNN learns peak positions better
# Shift step labels to improve peak positions for CNN
for p in left_peaks + right_peaks:
if 0 <= p < len(self.step_labels) - (window_size // 2):
self.step_labels[p + (window_size // 2)] = 1

# Debugging information
print("\n==== Debugging Step Extraction ====")
print("Step count dataset (first few rows):")
print("Step data (first few rows):")
print(step_counts.head())

print("\nLeft foot peak extraction:")
print("Raw string from CSV:", step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values)
print("\nExtraction of peaks for the left foot:")
print("Raw data from CSV:", step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values)
print("Extracted peaks:", left_peaks)

print("\nRight foot peak extraction:")
print("Raw string from CSV:", step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values)
print("\nExtraction of peaks for the right foot:")
print("Raw data from CSV:", step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values)
print("Extracted peaks:", right_peaks)

print("\nTotal peaks found: Left =", len(left_peaks), ", Right =", len(right_peaks))
Expand All @@ -70,32 +74,42 @@ def __len__(self):
def __getitem__(self, idx):
x = self.data[idx : idx + self.window_size]
y = self.step_labels[idx : idx + self.window_size]
return x, y

# Data augmentation: Add slight noise to the data
noise = np.random.normal(0, 0.02, x.shape)
x_augmented = x + noise

return x_augmented, y


def load_datasets(folder_path, window_size, batch_size):
"""
Reads (foldername)_left_acceleration_data.csv,
(foldername)_right_acceleration_data.csv,
scaled_step_counts.csv
Reads the following files:
(Folder name)_left_acceleration_data.csv,
(Folder name)_right_acceleration_data.csv,
scaled_step_counts.csv
and creates a DataLoader with segments.
"""
folder_name = os.path.basename(folder_path)
left_file = os.path.join(folder_path, f"{folder_name}_left_acceleration_data.csv")
right_file = os.path.join(folder_path, f"{folder_name}_right_acceleration_data.csv")
step_file = os.path.join(folder_path, "scaled_step_counts.csv")

# Check if all required files exist
if not (os.path.exists(left_file) and os.path.exists(right_file) and os.path.exists(step_file)):
print(f"Folder {folder_name}: Missing files, skipping.")
return None

# Load data from CSV files
left_data = pd.read_csv(left_file)
right_data = pd.read_csv(right_file)
step_counts = pd.read_csv(step_file)

# Check if any of the dataframes are empty
if left_data.empty or right_data.empty or step_counts.empty:
print(f"Folder {folder_name}: Empty data, skipping.")
return None

# Create dataset and DataLoader
dataset = StepCounterDataset(left_data, right_data, step_counts, window_size)
return DataLoader(dataset, batch_size=batch_size, shuffle=True)
101 changes: 68 additions & 33 deletions cnn/model_step_counter.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,76 @@
import torch.nn as nn
import torch.nn.functional as F


class StepCounterCNN(nn.Module):
def __init__(self, window_size):
"""
Initializes the StepCounterCNN model.

Args:
window_size (int): The size of the input window for the time series data.
"""
super().__init__()
self.conv1 = nn.Conv1d(2, 32, kernel_size=5, padding=2)
self.relu1 = nn.ReLU()
self.pool1 = nn.MaxPool1d(kernel_size=2)
self.batch_norm1 = nn.BatchNorm1d(32)

self.conv2 = nn.Conv1d(32, 64, kernel_size=5, padding=2)
self.relu2 = nn.ReLU()
self.pool2 = nn.MaxPool1d(kernel_size=2)
self.batch_norm2 = nn.BatchNorm1d(64)

fc1_input_size = (window_size // 4) * 64
self.fc1 = nn.Linear(fc1_input_size, 128)
self.relu3 = nn.ReLU()
self.fc2 = nn.Linear(128, 1)
self.sigmoid = nn.Sigmoid()
self.dropout = nn.Dropout(0.3)

# First convolutional layer
self.conv1 = nn.Conv1d(2, 32, kernel_size=7, padding=3) # Input channels: 2, Output channels: 32
self.bn1 = nn.BatchNorm1d(32) # Batch normalization for the first layer
self.pool = nn.MaxPool1d(3, stride=2, padding=1) # Max pooling layer

# Residual blocks
self.resblock1 = self._make_resblock(32, 64) # First residual block
self.resblock2 = self._make_resblock(64, 128, stride=2) # Second residual block with stride 2

# Fully Connected Layers
final_length = window_size // 4 # Calculate the final length after pooling and residual blocks
self.fc1 = nn.Linear(128 * final_length, 64) # First fully connected layer
self.fc2 = nn.Linear(64, 1) # Second fully connected layer
self.sigmoid = nn.Sigmoid() # Sigmoid activation for binary classification
self.dropout = nn.Dropout(0.5) # Dropout for regularization

def _make_resblock(self, in_channels, out_channels, stride=1):
"""
Creates a residual block with two convolutional layers, batch normalization, and ReLU activation.

Args:
in_channels (int): Number of input channels.
out_channels (int): Number of output channels.
stride (int): Stride for the first convolutional layer.

Returns:
nn.Sequential: A sequential container representing the residual block.
"""
return nn.Sequential(
nn.Conv1d(in_channels, out_channels, 3, stride=stride, padding=1), # First convolutional layer
nn.BatchNorm1d(out_channels), # Batch normalization
nn.ReLU(), # ReLU activation
nn.Conv1d(out_channels, out_channels, 3, padding=1), # Second convolutional layer
nn.BatchNorm1d(out_channels), # Batch normalization
nn.ReLU(), # ReLU activation
nn.Dropout(0.5), # Dropout for regularization
)

def forward(self, x):
x = self.conv1(x)
x = self.relu1(x)
x = self.pool1(x)
x = self.batch_norm1(x)

x = self.conv2(x)
x = self.relu2(x)
x = self.pool2(x)
x = self.batch_norm2(x)

x = x.flatten(start_dim=1)
x = self.fc1(x)
x = self.relu3(x)
x = self.dropout(x)
x = self.fc2(x)
x = self.sigmoid(x)
return x
"""
Defines the forward pass of the model.

Args:
x (torch.Tensor): Input tensor of shape (batch_size, 2, window_size).

Returns:
torch.Tensor: Output tensor of shape (batch_size, 1) after applying the sigmoid function.
"""
# Initial layer
x = self.pool(F.relu(self.bn1(self.conv1(x)))) # Apply convolution, batch norm, ReLU, and pooling

# Residual blocks
x = self.resblock1(x) # Apply first residual block
x = self.resblock2(x) # Apply second residual block

# Classification
x = x.flatten(1) # Flatten the tensor for the fully connected layer
x = self.fc1(x) # Apply first fully connected layer
x = F.relu(x) # Apply ReLU activation
x = self.dropout(x) # Apply dropout
x = self.fc2(x) # Apply second fully connected layer
return self.sigmoid(x) # Apply sigmoid activation for binary classification
121 changes: 121 additions & 0 deletions cnn/prediction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import torch
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from scipy.signal import find_peaks
from sklearn.preprocessing import StandardScaler
import ast
from model_step_counter import StepCounterCNN


def load_model(model_path, device, window_size=64):
"""Loads the trained model."""
model = StepCounterCNN(window_size)
model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)
model.eval()
return model


def compute_enmo(data):
"""Computes the Euclidean Norm Minus One (ENMO) from accelerometer data."""
norm = np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) - 1
return np.maximum(norm, 0)


def process_data(left_csv, right_csv):
"""Loads and processes acceleration data from left and right foot CSV files."""
left_df = pd.read_csv(left_csv)
right_df = pd.read_csv(right_csv)

return pd.DataFrame({"ENMO_left": compute_enmo(left_df), "ENMO_right": compute_enmo(right_df)})


def detect_steps(model, device, data, window_size=64):
"""Runs the step detection model on the given data."""
data = torch.tensor(StandardScaler().fit_transform(data), dtype=torch.float32, device=device)
frame_probs = np.zeros(len(data), dtype=np.float32)
overlap_cnt = np.zeros(len(data), dtype=np.float32)

with torch.no_grad():
for start in range(len(data) - window_size):
window = data[start : start + window_size].T.unsqueeze(0)
frame_probs[start : start + window_size] += model(window).cpu().numpy().flatten()
overlap_cnt[start : start + window_size] += 1

frame_probs[overlap_cnt > 0] /= overlap_cnt[overlap_cnt > 0]
return find_peaks(frame_probs, height=0.02, distance=30, prominence=0.05)[0]


def parse_groundtruth_steps(groundtruth_csv):
"""Parses the ground truth step data from CSV."""
groundtruth_df = pd.read_csv(groundtruth_csv, nrows=2) # Only consider the first two rows
steps = set()
for peak_str in groundtruth_df["Peaks"].dropna():
try:
steps.update(ast.literal_eval(peak_str))
except (SyntaxError, ValueError):
continue
return steps


def plot_results(data, detected_steps, groundtruth_steps):
"""Generates an interactive Plotly visualization of acceleration data, detected steps, and ground truth."""
fig = go.Figure()
time_axis = np.arange(len(data))

# Plot acceleration data
for col in data.columns:
fig.add_trace(go.Scatter(x=time_axis, y=data[col], mode="lines", name=col))

# Plot detected steps
fig.add_trace(
go.Scatter(
x=list(detected_steps),
y=[data.iloc[i].mean() for i in detected_steps],
mode="markers",
name=f"Detected Steps ({len(detected_steps)})",
marker=dict(color="red", size=8),
)
)

# Plot ground truth steps
fig.add_trace(
go.Scatter(
x=list(groundtruth_steps),
y=[data.iloc[i].mean() for i in groundtruth_steps],
mode="markers",
name=f"Ground Truth Steps ({len(groundtruth_steps)})",
marker=dict(color="green", symbol="x", size=8),
)
)

fig.update_layout(
title="Step Detection Visualization",
xaxis_title="Frame",
yaxis_title="Acceleration / Probability",
legend_title="Legend",
template="plotly_white",
)

fig.show()


def main(model_path, left_csv, right_csv, groundtruth_csv):
"""Runs the full step detection pipeline and visualization."""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = load_model(model_path, device)
data = process_data(left_csv, right_csv)
detected_steps = detect_steps(model, device, data)
groundtruth_steps = parse_groundtruth_steps(groundtruth_csv)

plot_results(data, detected_steps, groundtruth_steps)


if __name__ == "__main__":
model_path = "D:/Daisy/5. Semester/SmartHealth/Step-counter/cnn/best_model.pth"
left_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/005_left_acceleration_data.csv"
right_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/005_right_acceleration_data.csv"
groundtruth_csv = "D:/Daisy/5. Semester/SmartHealth/Step-counter/Output/processed_sliced_and_scaled data/test/005/scaled_step_counts.csv"

main(model_path, left_csv, right_csv, groundtruth_csv)
Loading