Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 64 additions & 48 deletions cnn/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,38 @@
from torch.utils.data import Dataset, DataLoader
import ast


def compute_enmo(data):
# Calculate the ENMO value for the data
norm = np.sqrt(data["X"] ** 2 + data["Y"] ** 2 + data["Z"] ** 2) - 1
return np.maximum(norm, 0) # Set negative values to 0

# Compute the Euclidean Norm Minus One (ENMO) for the accelerometer data
norm = np.sqrt(data["X"]**2 + data["Y"]**2 + data["Z"]**2) - 1
return np.maximum(norm, 0)

class StepCounterDataset(Dataset):
def __init__(self, left_data, right_data, step_counts, window_size):
self.window_size = window_size # Ensure window_size is assigned

# Calculate ENMO for both feet
def __init__(self, left_data, right_data, step_counts, window_size, gait_vector):
"""
Args:
left_data, right_data: Accelerometer data (DataFrame).
step_counts: DataFrame with step peaks.
window_size: Length of the window.
gait_vector: np.array of length 6, e.g., [0,1,0,1,0,0] for gait labels.
"""
self.window_size = window_size

# --- ENMO, differences, normalization as usual ---
left_data["ENMO"] = compute_enmo(left_data)
right_data["ENMO"] = compute_enmo(right_data)

# Calculate the difference in ENMO values
left_data["ENMO_DIFF"] = left_data["ENMO"].diff().fillna(0)
right_data["ENMO_DIFF"] = right_data["ENMO"].diff().fillna(0)

# Stack the ENMO differences for both feet
self.data = np.hstack((left_data[["ENMO_DIFF"]], right_data[["ENMO_DIFF"]]))
self.data = np.hstack((
left_data[["ENMO_DIFF"]].values,
right_data[["ENMO_DIFF"]].values
))

# Normalize the data
self.scaler = StandardScaler()
self.data = self.scaler.fit_transform(self.data)

# Extract step labels
# --- Extract step labels ---
def extract_peaks(peaks_str):
if isinstance(peaks_str, str):
try:
Expand All @@ -40,76 +45,87 @@ def extract_peaks(peaks_str):
return []
return []

# Extract peaks for left and right feet
left_peaks = extract_peaks(step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values[0])
right_peaks = extract_peaks(step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values[0])

# Create step labels
# Step label: 0 or 1 per sample
self.step_labels = np.zeros(len(self.data), dtype=np.float32)

# Shift step labels to improve peak positions for CNN
for p in left_peaks + right_peaks:
# Small offset to center the peak in the window
if 0 <= p < len(self.step_labels) - (window_size // 2):
self.step_labels[p + (window_size // 2)] = 1

# Debugging information
print("\n==== Debugging Step Extraction ====")
print("Step data (first few rows):")
print(step_counts.head())

print("\nExtraction of peaks for the left foot:")
print("Raw data from CSV:", step_counts.loc[step_counts["Joint"] == "left_foot_index", "Peaks"].values)
print("Extracted peaks:", left_peaks)

print("\nExtraction of peaks for the right foot:")
print("Raw data from CSV:", step_counts.loc[step_counts["Joint"] == "right_foot_index", "Peaks"].values)
print("Extracted peaks:", right_peaks)

print("\nTotal peaks found: Left =", len(left_peaks), ", Right =", len(right_peaks))
print("==================================\n")
# --- Gait label: 6-dimensional vector ---
# Since the gait is the same for the entire dataset, we store it
# (and will repeat it for each window).
self.gait_label = gait_vector.astype(np.float32) # shape (6,)

def __len__(self):
return len(self.data) - self.window_size

def __getitem__(self, idx):
# x shape: (window_size, 2)
x = self.data[idx : idx + self.window_size]
y = self.step_labels[idx : idx + self.window_size]

# Data augmentation: Add slight noise to the data
# Step label per sample in the window
y_step = self.step_labels[idx : idx + self.window_size] # shape (window_size,)

# Gait label is the same for the entire window:
# We create a shape (window_size, 6) that carries self.gait_label everywhere.
y_gait = np.tile(self.gait_label, (self.window_size, 1)) # (window_size, 6)

# We combine everything into one label: (window_size, 7)
# Column 0 = step label, columns 1..6 = gait
# => y[i,0] = y_step[i], y[i,1:] = y_gait[i]
y = np.zeros((self.window_size, 7), dtype=np.float32)
y[:, 0] = y_step
y[:, 1:] = y_gait

# Optional data augmentation
noise = np.random.normal(0, 0.02, x.shape)
x_augmented = x + noise

return x_augmented, y


def load_datasets(folder_path, window_size, batch_size):
def load_datasets(folder_path, window_size, batch_size, gait_info_df):
"""
Reads the following files:
(Folder name)_left_acceleration_data.csv,
(Folder name)_right_acceleration_data.csv,
scaled_step_counts.csv
and creates a DataLoader with segments.
Loads data from:
- (Folder name)_left_acceleration_data.csv
- (Folder name)_right_acceleration_data.csv
- scaled_step_counts.csv
Searches the DataFrame `gait_info_df` for the row corresponding to the current folder ID
and constructs a gait label from it.
"""
folder_name = os.path.basename(folder_path)
left_file = os.path.join(folder_path, f"{folder_name}_left_acceleration_data.csv")
right_file = os.path.join(folder_path, f"{folder_name}_right_acceleration_data.csv")
step_file = os.path.join(folder_path, "scaled_step_counts.csv")

# Check if all required files exist
if not (os.path.exists(left_file) and os.path.exists(right_file) and os.path.exists(step_file)):
print(f"Folder {folder_name}: Missing files, skipping.")
return None

# Load data from CSV files
# Load CSVs
left_data = pd.read_csv(left_file)
right_data = pd.read_csv(right_file)
step_counts = pd.read_csv(step_file)

# Check if any of the dataframes are empty
if left_data.empty or right_data.empty or step_counts.empty:
print(f"Folder {folder_name}: Empty data, skipping.")
return None

# Create dataset and DataLoader
dataset = StepCounterDataset(left_data, right_data, step_counts, window_size)
return DataLoader(dataset, batch_size=batch_size, shuffle=True)
# --- Get gait label from gait_info_df ---
# e.g., video_id == folder_name
row = gait_info_df[gait_info_df["video_id"] == folder_name]
if row.empty:
print(f"No row found for {folder_name} in gait_info_df, using 0-label.")
gait_label = np.zeros(6, dtype=np.float32)
else:
# Important: Adjust the order here to match the columns from the CSV
gait_label = row[["langsames_gehen","normales_gehen","laufen",
"frei_mitschwingend","links_in_ht","rechts_in_ht"]].values[0]
# => shape (6,)

dataset = StepCounterDataset(left_data, right_data, step_counts,
window_size, gait_vector=gait_label)
return DataLoader(dataset, batch_size=batch_size, shuffle=True)
90 changes: 31 additions & 59 deletions cnn/model_step_counter.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,48 @@
import torch.nn as nn
import torch.nn.functional as F


class StepCounterCNN(nn.Module):
def __init__(self, window_size):
"""
Initializes the StepCounterCNN model.

Args:
window_size (int): The size of the input window for the time series data.
"""
super().__init__()
# Everything as before, except the last layer:
self.conv1 = nn.Conv1d(2, 32, kernel_size=7, padding=3)
self.bn1 = nn.BatchNorm1d(32)
self.pool = nn.MaxPool1d(3, stride=2, padding=1)

# First convolutional layer
self.conv1 = nn.Conv1d(2, 32, kernel_size=7, padding=3) # Input channels: 2, Output channels: 32
self.bn1 = nn.BatchNorm1d(32) # Batch normalization for the first layer
self.pool = nn.MaxPool1d(3, stride=2, padding=1) # Max pooling layer
self.resblock1 = self._make_resblock(32, 64)
self.resblock2 = self._make_resblock(64, 128, stride=2)

# Residual blocks
self.resblock1 = self._make_resblock(32, 64) # First residual block
self.resblock2 = self._make_resblock(64, 128, stride=2) # Second residual block with stride 2
final_length = window_size // 4
self.fc1 = nn.Linear(128 * final_length, 64)

# Fully Connected Layers
final_length = window_size // 4 # Calculate the final length after pooling and residual blocks
self.fc1 = nn.Linear(128 * final_length, 64) # First fully connected layer
self.fc2 = nn.Linear(64, 1) # Second fully connected layer
self.sigmoid = nn.Sigmoid() # Sigmoid activation for binary classification
self.dropout = nn.Dropout(0.5) # Dropout for regularization
# IMPORTANT: instead of 1 now 7 outputs (1 for step + 6 for gait types)
self.fc2 = nn.Linear(64, 7)
self.sigmoid = nn.Sigmoid()
self.dropout = nn.Dropout(0.5)

def _make_resblock(self, in_channels, out_channels, stride=1):
"""
Creates a residual block with two convolutional layers, batch normalization, and ReLU activation.

Args:
in_channels (int): Number of input channels.
out_channels (int): Number of output channels.
stride (int): Stride for the first convolutional layer.

Returns:
nn.Sequential: A sequential container representing the residual block.
"""
return nn.Sequential(
nn.Conv1d(in_channels, out_channels, 3, stride=stride, padding=1), # First convolutional layer
nn.BatchNorm1d(out_channels), # Batch normalization
nn.ReLU(), # ReLU activation
nn.Conv1d(out_channels, out_channels, 3, padding=1), # Second convolutional layer
nn.BatchNorm1d(out_channels), # Batch normalization
nn.ReLU(), # ReLU activation
nn.Dropout(0.5), # Dropout for regularization
nn.Conv1d(in_channels, out_channels, 3, stride=stride, padding=1),
nn.BatchNorm1d(out_channels),
nn.ReLU(),
nn.Conv1d(out_channels, out_channels, 3, padding=1),
nn.BatchNorm1d(out_channels),
nn.ReLU(),
nn.Dropout(0.5),
)

def forward(self, x):
"""
Defines the forward pass of the model.

Args:
x (torch.Tensor): Input tensor of shape (batch_size, 2, window_size).

Returns:
torch.Tensor: Output tensor of shape (batch_size, 1) after applying the sigmoid function.
x shape: (batch_size, 2, window_size)
we return shape: (batch_size, 7)
"""
# Initial layer
x = self.pool(F.relu(self.bn1(self.conv1(x)))) # Apply convolution, batch norm, ReLU, and pooling

# Residual blocks
x = self.resblock1(x) # Apply first residual block
x = self.resblock2(x) # Apply second residual block

# Classification
x = x.flatten(1) # Flatten the tensor for the fully connected layer
x = self.fc1(x) # Apply first fully connected layer
x = F.relu(x) # Apply ReLU activation
x = self.dropout(x) # Apply dropout
x = self.fc2(x) # Apply second fully connected layer
return self.sigmoid(x) # Apply sigmoid activation for binary classification
x = self.pool(F.relu(self.bn1(self.conv1(x))))
x = self.resblock1(x)
x = self.resblock2(x)

x = x.flatten(1) # (batch_size, 128*final_length)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout(x)
x = self.fc2(x)
return self.sigmoid(x)
Loading