Skip to content
75 changes: 75 additions & 0 deletions backend/backend/ecg_app/Ai/difficulty_clustering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from django.db.models import Count, Avg, Q
from ecg_app.models import EcgSamples, QuestionAttempt
from sklearn.cluster import KMeans
import pandas as pd


def recalculate_difficulty(n_clusters=5, min_attempts=5):
"""
Recalculates difficulty level for each ECG sample using KMeans clustering.
Difficulty is based on:
- Error rate (1 - accuracy)
- Average response time
Only samples with at least `min_attempts` attempts are included.
"""

print("📊 Extracting features for ECG samples...")

# Aggregate performance data per ECG sample via QuestionAttempt
samples = (
QuestionAttempt.objects.values('question__ecg_sample')
.annotate(
total=Count('id'),
correct=Count('id', filter=Q(is_correct=True)),
avg_time=Avg('response_time')
)
.filter(total__gte=min_attempts)
)

if not samples:
print("⚠️ Not enough data to run KMeans.")
return

data = []
ids = []

for s in samples:
accuracy = s['correct'] / s['total']
time = s['avg_time'] or 0
sample_id = s['question__ecg_sample']
ids.append(sample_id)
data.append([1 - accuracy, time]) # Features: [error_rate, avg_time]

features = pd.DataFrame(data, columns=['error_rate', 'avg_time'])

print(f"🔍 Clustering {len(ids)} samples using KMeans...")

# Apply KMeans clustering
model = KMeans(n_clusters=n_clusters, random_state=42)
clusters = model.fit_predict(features)

# Rank clusters by difficulty (error_rate + avg_time)
features['cluster'] = clusters
features['sample_id'] = ids
cluster_difficulty = (
features.groupby('cluster')[['error_rate', 'avg_time']]
.mean()
.assign(difficulty_score=lambda df: df['error_rate'] + df['avg_time'])
.sort_values('difficulty_score')
)

cluster_to_difficulty_level = {
cluster_id: rank for rank, cluster_id in enumerate(cluster_difficulty.index)
}
# Save difficulty level to the database
updated = 0
for i, sample_id in enumerate(ids):
try:
sample = EcgSamples.objects.get(sample_id=sample_id) # use sample_id (custom PK)
sample.difficulty_level = cluster_to_difficulty_level[clusters[i]]
sample.save()
updated += 1
except EcgSamples.DoesNotExist:
continue

print(f"✅ Difficulty levels updated for {updated} samples.")
55 changes: 55 additions & 0 deletions backend/backend/ecg_app/Ai/student_level_estimation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import numpy as np
from django.contrib.auth.models import User
from ecg_app.models import QuizAttempt, QuestionAttempt, Profile, EcgDocLabels, StudentTagMastery

def recalculate_student_levels():
print("🧠 Running student level estimation...")

# Step 1: Collect training data
X_train, y_train = [], []
user_features = {}

for user in User.objects.all():
attempts = QuestionAttempt.objects.filter(quiz_attempt__user=user)
if not attempts.exists():
continue

avg_response_time = attempts.aggregate(avg_time=models.Avg('response_time'))['avg_time'] or 0
correct_ratio = attempts.aggregate(correct_avg=models.Avg(models.Case(
models.When(is_correct=True, then=1),
default=0,
output_field=models.FloatField()
)))['correct_avg'] or 0

total_attempts = attempts.count()

# For training: check if this user already has a labeled level
profile = getattr(user, 'profile', None)
if profile and profile.student_level is not None:
X_train.append([avg_response_time, correct_ratio, total_attempts])
y_train.append(profile.student_level)

user_features[user.id] = [avg_response_time, correct_ratio, total_attempts]

if not X_train:
print("⚠️ No training data available.")
return

# Step 2: Train model
model = Pipeline([
('scaler', StandardScaler()),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
model.fit(X_train, y_train)

# Step 3: Predict levels for all users
for user_id, features in user_features.items():
predicted_level = model.predict([features])[0]
profile = Profile.objects.get(user__id=user_id)
profile.student_level = predicted_level
profile.save()

print("✅ Student levels updated successfully.")
72 changes: 72 additions & 0 deletions backend/backend/ecg_app/Ai/tag_mastery_estimation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from django.db.models import Avg, Count, Q
from ecg_app.models import QuestionAttempt, EcgSamplesDocLabels, StudentTagMastery
from sklearn.ensemble import RandomForestRegressor
import pandas as pd


def recalculate_tag_mastery():
"""
Calculates and updates tag-specific mastery levels for each student.
Uses a regression model to estimate mastery as a percentage (0-100).
"""
print("📊 Gathering question attempt data for tag-based mastery...")

# Step 1: Gather relevant data
attempts = (
QuestionAttempt.objects
.filter(question__ecg_sample__doc_labels__isnull=False)
.values('quiz_attempt__user', 'question__ecg_sample__doc_labels')
.annotate(
avg_correct=Avg('is_correct'),
avg_time=Avg('response_time'),
total=Count('id')
)
.filter(total__gte=3)
)

if not attempts:
print("⚠️ Not enough data to train the model.")
return

# Step 2: Build dataset
rows = []
for row in attempts:
rows.append({
'user_id': row['quiz_attempt__user'],
'tag_id': row['question__ecg_sample__doc_labels'],
'avg_correct': float(row['avg_correct']),
'avg_time': float(row['avg_time']),
'total_attempts': row['total']
})

df = pd.DataFrame(rows)

# Normalize input values
df['avg_time'] = df['avg_time'].fillna(df['avg_time'].mean())
df['avg_correct'] = df['avg_correct'].fillna(0)

# Step 3: Train a regression model to simulate mastery percentage
df['mastery_percent'] = df['avg_correct'] * 100 # Simple assumption

features = df[['avg_correct', 'avg_time', 'total_attempts']]
target = df['mastery_percent']

model = RandomForestRegressor(random_state=42)
model.fit(features, target)
df['predicted_mastery'] = model.predict(features)

print("🤖 Model trained and predictions generated.")

# Step 4: Save mastery values to DB
updated = 0
for _, row in df.iterrows():
mastery_value = min(100.0, max(0.0, float(row['predicted_mastery']))) # Ensure in 0-100 range

StudentTagMastery.objects.update_or_create(
user_id=row['user_id'],
tag_id=row['tag_id'],
defaults={'mastery_score': mastery_value}
)
updated += 1

print(f"✅ Updated mastery scores for {updated} student-tag pairs.")
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from django.core.management.base import BaseCommand
from ecg_app.Ai.difficulty_clustering import recalculate_difficulty

class Command(BaseCommand):
help = 'Recalculate difficulty level for each ECG sample using KMeans clustering'

def handle(self, *args, **kwargs):
recalculate_difficulty()
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from django.core.management.base import BaseCommand
from ecg_app.Ai.student_level_estimation import recalculate_student_levels

class Command(BaseCommand):
help = 'Recalculate student levels using ML model based on past quiz attempts'

def handle(self, *args, **options):
self.stdout.write("🔄 Running student level estimation...")
try:
recalculate_student_levels()
self.stdout.write(self.style.SUCCESS("✅ Student levels updated successfully."))
except Exception as e:
self.stdout.write(self.style.ERROR(f"❌ Error occurred: {str(e)}"))
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from django.core.management.base import BaseCommand
from ecg_app.Ai.tag_mastery_estimation import recalculate_tag_mastery

class Command(BaseCommand):
help = 'Recalculate student mastery per ECG tag using regression model'

def handle(self, *args, **kwargs):
self.stdout.write(self.style.NOTICE("🔄 Starting tag-based mastery recalculation..."))
recalculate_tag_mastery()
self.stdout.write(self.style.SUCCESS("✅ Done. Mastery values updated."))
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.1.3 on 2025-07-05 13:08

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('ecg_app', '0010_alter_ecgsamplevalidation_options_and_more'),
]

operations = [
migrations.AddField(
model_name='ecgsamples',
name='difficulty_level',
field=models.IntegerField(blank=True, null=True),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.2.4 on 2025-07-08 08:44

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('ecg_app', '0011_ecgsamples_difficulty_level'),
]

operations = [
migrations.AddField(
model_name='questionattempt',
name='response_time',
field=models.FloatField(blank=True, null=True),
),
]
18 changes: 18 additions & 0 deletions backend/backend/ecg_app/migrations/0013_profile_student_level.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.2.4 on 2025-07-17 18:42

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('ecg_app', '0012_questionattempt_response_time'),
]

operations = [
migrations.AddField(
model_name='profile',
name='student_level',
field=models.IntegerField(blank=True, null=True),
),
]
28 changes: 28 additions & 0 deletions backend/backend/ecg_app/migrations/0014_studenttagmastery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 5.2.4 on 2025-07-21 12:13

import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('ecg_app', '0013_profile_student_level'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]

operations = [
migrations.CreateModel(
name='StudentTagMastery',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('mastery_score', models.FloatField(blank=True, null=True)),
('tag', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='student_masteries', to='ecg_app.ecgdoclabels')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='tag_masteries', to=settings.AUTH_USER_MODEL)),
],
options={
'unique_together': {('user', 'tag')},
},
),
]
14 changes: 14 additions & 0 deletions backend/backend/ecg_app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class EcgSamples(models.Model):
null=True
)
age = models.PositiveIntegerField(blank=True, null=True)
difficulty_level = models.IntegerField(null=True, blank=True)


def __str__(self):
Expand Down Expand Up @@ -108,6 +109,7 @@ class Profile(models.Model):
choices=[('student', 'Student'), ('teacher', 'Teacher')],
default='student'
)
student_level = models.IntegerField(null=True, blank=True)

def __str__(self):
return self.user.username
Expand Down Expand Up @@ -177,6 +179,7 @@ class QuestionAttempt(models.Model):
question = models.ForeignKey(Question, on_delete=models.CASCADE)
selected_choice = models.ForeignKey(Choice, on_delete=models.SET_NULL, blank=True, null=True)
is_correct = models.BooleanField(default=False)
response_time = models.FloatField(null=True, blank=True)

def __str__(self):
return f"Attempt for Question {self.question.id} in {self.quiz_attempt}"
Expand Down Expand Up @@ -215,3 +218,14 @@ class Meta:

def __str__(self):
return f"{self.student.username} - {self.group.name} ({self.status})"

class StudentTagMastery(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='tag_masteries')
tag = models.ForeignKey(EcgDocLabels, on_delete=models.CASCADE, related_name='student_masteries')
mastery_score = models.FloatField(null=True, blank=True)

class Meta:
unique_together = ('user', 'tag')

def __str__(self):
return f"{self.user.username} - {self.tag.label_desc} - Score: {self.mastery_score}"
4 changes: 3 additions & 1 deletion backend/backend/ecg_app/views/quiz.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def create(self, request, *args, **kwargs):
for answer in answers:
question_id = answer.get('question')
choice_id = answer.get('selected_choice')
response_time = answer.get('response_time')

try:
question = Question.objects.get(id=question_id, quiz=quiz)
Expand All @@ -155,7 +156,8 @@ def create(self, request, *args, **kwargs):
quiz_attempt=quiz_attempt,
question=question,
selected_choice=choice,
is_correct=is_correct
is_correct=is_correct,
response_time=response_time
)
except (Question.DoesNotExist, Choice.DoesNotExist):
continue
Expand Down
3 changes: 2 additions & 1 deletion backend/backend/ecg_backend/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
SECRET_KEY = os.getenv('DJANGO_SECRET_KEY')

# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = os.getenv('DEBUG').lower() == 'true'
DEBUG = os.getenv('DEBUG', 'false').lower() == 'true'

PROD = not DEBUG

if PROD:
Expand Down
Loading