Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified Backend/__pycache__/main.cpython-313.pyc
Binary file not shown.
Binary file modified Backend/__pycache__/main.cpython-314.pyc
Binary file not shown.
Binary file modified Backend/app/__pycache__/database.cpython-313.pyc
Binary file not shown.
Binary file modified Backend/app/__pycache__/database.cpython-314.pyc
Binary file not shown.
Binary file modified Backend/app/api/routes/__pycache__/feedback.cpython-313.pyc
Binary file not shown.
Binary file modified Backend/app/api/routes/__pycache__/feedback.cpython-314.pyc
Binary file not shown.
Binary file modified Backend/app/api/routes/__pycache__/module.cpython-313.pyc
Binary file not shown.
Binary file not shown.
Binary file modified Backend/app/api/routes/__pycache__/student.cpython-313.pyc
Binary file not shown.
Binary file not shown.
178 changes: 157 additions & 21 deletions Backend/app/api/routes/feedback.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from fastapi import APIRouter, Depends, HTTPException, Query, BackgroundTasks
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from sqlalchemy import func
from uuid import UUID
import asyncio
import json
import logging
import time
from datetime import datetime, timezone

from app.database import get_db
from app.database import get_db, SessionLocal
from app.models.ai_feedback import AIFeedback
from app.models.student_answer import StudentAnswer
from app.models.feedback_job import FeedbackJob
from app.crud.ai_feedback import (
get_feedback_by_answer,
check_and_mark_timeout,
Expand All @@ -18,6 +24,141 @@
logger = logging.getLogger(__name__)


@router.get("/stream/{module_id}")
async def stream_feedback_progress(
module_id: UUID,
request: Request,
student_id: str = Query(..., description="Student ID"),
attempt: int = Query(1, description="Attempt number", ge=1),
):
"""
SSE endpoint for real-time feedback generation progress.
Pushes events as feedback completes instead of requiring polling.

Events:
- progress: {ready, total, percentage} — when a new feedback completes
- complete: {ready, total} — when all feedback is done
- heartbeat: {} — keep-alive every 15s

Max duration: 6 minutes (matches frontend timeout).
"""

async def event_generator():
MAX_DURATION = 6 * 60 # 6 minutes
POLL_DB_INTERVAL = 2.0 # seconds between DB checks (light on DB for 500 students)
HEARTBEAT_INTERVAL = 15 # seconds between heartbeats
start_time = time.time()
last_heartbeat = start_time
last_ready_count = -1

try:
while True:
# Check if client disconnected
if await request.is_disconnected():
break

# Check max duration
elapsed = time.time() - start_time
if elapsed >= MAX_DURATION:
yield f"event: timeout\ndata: {json.dumps({'message': 'Stream timeout after 6 minutes'})}\n\n"
break

# Single efficient query: count total, completed, and failed in one pass
db = SessionLocal()
try:
from sqlalchemy import case, literal_column
from sqlalchemy.orm import aliased

# Count total answers
total = (
db.query(func.count(StudentAnswer.id))
.filter(
StudentAnswer.student_id == student_id,
StudentAnswer.module_id == module_id,
StudentAnswer.attempt == attempt,
)
.scalar()
) or 0

# Single query: count completed + failed in one pass
status_counts = dict(
db.query(
AIFeedback.generation_status,
func.count(AIFeedback.id)
)
.join(StudentAnswer, AIFeedback.answer_id == StudentAnswer.id)
.filter(
StudentAnswer.student_id == student_id,
StudentAnswer.module_id == module_id,
StudentAnswer.attempt == attempt,
)
.group_by(AIFeedback.generation_status)
.all()
)

ready = status_counts.get('completed', 0)
failed = status_counts.get('failed', 0) + status_counts.get('timeout', 0)

# Check pending jobs
pending_jobs = (
db.query(func.count(FeedbackJob.id))
.filter(
FeedbackJob.student_id == student_id,
FeedbackJob.module_id == module_id,
FeedbackJob.attempt == attempt,
FeedbackJob.status.in_(["queued", "processing"]),
)
.scalar()
) or 0

finally:
db.close()

percentage = round((ready / total * 100), 1) if total > 0 else 0
all_complete = (ready + failed) >= total and total > 0 and pending_jobs == 0

# Send progress event if count changed
if ready != last_ready_count:
last_ready_count = ready
event_data = {
"ready": ready,
"total": total,
"failed": failed,
"percentage": percentage,
"pending_jobs": pending_jobs,
}
yield f"event: progress\ndata: {json.dumps(event_data)}\n\n"

# Send complete event and stop
if all_complete:
yield f"event: complete\ndata: {json.dumps({'ready': ready, 'total': total, 'failed': failed})}\n\n"
break

# Heartbeat to keep connection alive
now = time.time()
if now - last_heartbeat >= HEARTBEAT_INTERVAL:
last_heartbeat = now
yield f"event: heartbeat\ndata: {json.dumps({'elapsed': int(elapsed)})}\n\n"

await asyncio.sleep(POLL_DB_INTERVAL)

except asyncio.CancelledError:
logger.info(f"[SSE] Stream cancelled for module {module_id}")
except Exception as e:
logger.error(f"[SSE] Error in stream for module {module_id}: {e}")
yield f"event: error\ndata: {json.dumps({'message': str(e)[:200]})}\n\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)


@router.get("/{feedback_id}")
def get_ai_feedback_by_id(
feedback_id: UUID,
Expand Down Expand Up @@ -237,7 +378,6 @@ def retry_feedback_generation(
@router.post("/retry/module/{module_id}")
def retry_all_failed_feedback(
module_id: UUID,
background_tasks: BackgroundTasks,
student_id: str = Query(..., description="Student ID"),
attempt: int = Query(1, description="Attempt number", ge=1),
db: Session = Depends(get_db)
Expand Down Expand Up @@ -319,7 +459,7 @@ def retry_all_failed_feedback(
logger.info(f"📝 Including answer {answer.id} - no feedback exists, creating pending record")
# Create pending feedback record so background task can work properly
from app.crud.ai_feedback import create_pending_feedback
create_pending_feedback(db=db, answer_id=answer.id, timeout_seconds=120)
create_pending_feedback(db=db, answer_id=answer.id, timeout_seconds=45)
failed_answer_ids.append(str(answer.id))

elif feedback.generation_status is None:
Expand Down Expand Up @@ -390,27 +530,23 @@ def retry_all_failed_feedback(
"answer_ids": []
}

logger.info(f"🚀 ============ RETRYING {len(failed_answer_ids)} FAILED QUESTIONS ============")
logger.info(f"🚀 Answer IDs to retry: {failed_answer_ids}")

# Import and run the same background task used for initial submission
from app.api.routes.student import generate_feedback_background
logger.info(f"Retrying {len(failed_answer_ids)} failed questions via job queue")

# Add to FastAPI background tasks
logger.info(f"🎯 Adding background task: generate_feedback_background(student={student_id}, module={module_id}, attempt={attempt}, answer_ids={len(failed_answer_ids)} items)")
background_tasks.add_task(
generate_feedback_background,
student_id=student_id,
module_id=str(module_id),
attempt=attempt,
answer_ids=failed_answer_ids
)

logger.info(f"✅ Background task added successfully! Feedback generation will start shortly for {len(failed_answer_ids)} questions")
# Enqueue retry jobs — the worker picks them up automatically
from app.services.feedback_worker import create_feedback_job
for answer_id_str in failed_answer_ids:
create_feedback_job(
db=db,
answer_id=answer_id_str,
student_id=student_id,
module_id=str(module_id),
attempt=attempt,
priority=1,
)

return {
"success": True,
"message": f"Regenerating feedback for {len(failed_answer_ids)} question(s). This may take a few moments.",
"message": f"Enqueued {len(failed_answer_ids)} feedback retries. The worker will process them shortly.",
"answers_retried": len(failed_answer_ids),
"answer_ids": failed_answer_ids,
"total_answers": len(answers),
Expand Down
Loading
Loading