From 8c23d761ee479f13acd588ee77de0a4d270ca53e Mon Sep 17 00:00:00 2001 From: Jesse Stewart Date: Fri, 13 Mar 2026 12:58:24 -0400 Subject: [PATCH 1/4] feat: certificates all learners list api --- lms/djangoapps/instructor/views/api_urls.py | 20 + lms/djangoapps/instructor/views/api_v2.py | 465 ++++++++++++++++++ .../instructor/views/serializers_v2.py | 54 ++ lms/envs/devstack.py | 3 + lms/urls.py | 5 + 5 files changed, 547 insertions(+) diff --git a/lms/djangoapps/instructor/views/api_urls.py b/lms/djangoapps/instructor/views/api_urls.py index b61120e58c13..c2ce08e23cd5 100644 --- a/lms/djangoapps/instructor/views/api_urls.py +++ b/lms/djangoapps/instructor/views/api_urls.py @@ -66,6 +66,26 @@ api_v2.ORASummaryView.as_view(), name='ora_summary' ), + re_path( + rf'^courses/{COURSE_ID_PATTERN}/certificates/issued$', + api_v2.IssuedCertificatesView.as_view(), + name='issued_certificates' + ), + re_path( + rf'^courses/{COURSE_ID_PATTERN}/certificates/generation_history$', + api_v2.CertificateGenerationHistoryView.as_view(), + name='certificate_generation_history' + ), + re_path( + rf'^courses/{COURSE_ID_PATTERN}/certificates/regenerate$', + api_v2.RegenerateCertificatesView.as_view(), + name='regenerate_certificates' + ), + re_path( + rf'^courses/{COURSE_ID_PATTERN}/certificates/config$', + api_v2.CertificateConfigView.as_view(), + name='certificate_config' + ), ] urlpatterns = [ diff --git a/lms/djangoapps/instructor/views/api_v2.py b/lms/djangoapps/instructor/views/api_v2.py index dd399dcb064a..28bc0d79a9df 100644 --- a/lms/djangoapps/instructor/views/api_v2.py +++ b/lms/djangoapps/instructor/views/api_v2.py @@ -34,6 +34,13 @@ from xmodule.modulestore.django import modulestore from xmodule.modulestore.exceptions import ItemNotFoundError +from rest_framework.generics import GenericAPIView +from rest_framework.exceptions import NotFound +from django.db import transaction +from django.utils.decorators import method_decorator +from django.views.decorators.cache import cache_control +from django.utils.html import strip_tags +from django.utils.translation import gettext as _ from common.djangoapps.util.json_request import JsonResponseBadRequest from openedx.core.djangoapps.course_groups.cohorts import is_course_cohorted from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers @@ -59,6 +66,9 @@ UnitExtensionSerializer, ORASerializer, ORASummarySerializer, + IssuedCertificateSerializer, + CertificateGenerationHistorySerializer, + RegenerateCertificatesSerializer, ) from .tools import ( find_unit, @@ -1097,3 +1107,458 @@ def get(self, request, *args, **kwargs): serializer = self.get_serializer(items) return Response(serializer.data) + + +class IssuedCertificatesView(ListAPIView): + """ + View to retrieve issued certificates for a course with allowlist and invalidation details. + + **Example Requests** + + GET /api/instructor/v2/courses/{course_id}/certificates/issued + GET /api/instructor/v2/courses/{course_id}/certificates/issued?search=username + GET /api/instructor/v2/courses/{course_id}/certificates/issued?filter=received + GET /api/instructor/v2/courses/{course_id}/certificates/issued?page=2&page_size=50 + + **Response Values** + + { + "count": 100, + "next": "http://example.com/api/instructor/v2/courses/.../certificates/issued?page=2", + "previous": null, + "results": [ + { + "username": "student1", + "email": "student1@example.com", + "enrollment_track": "verified", + "certificate_status": "downloadable", + "special_case": "Exception", + "exception_granted": "January 15, 2024", + "exception_notes": "Medical emergency", + "invalidated_by": null, + "invalidation_date": null + }, + ... + ] + } + + **Parameters** + + course_id: Course key for the course + search (optional): Filter by username or email + filter (optional): Filter certificates by category: + - "all": All Learners (default) + - "received": Received (downloadable certificates) + - "not_received": Not Received (not passing, unavailable) + - "audit_passing": Audit - Passing + - "audit_not_passing": Audit - Not Passing + - "error": Error State + - "granted_exceptions": Granted Exceptions (allowlisted) + - "invalidated": Invalidated + page (optional): Page number for pagination + page_size (optional): Number of results per page + + **Returns** + + * 200: OK - Returns paginated list of issued certificates + * 401: Unauthorized - User is not authenticated + * 403: Forbidden - User lacks instructor permissions + * 404: Not Found - Course does not exist + """ + permission_classes = (IsAuthenticated, permissions.InstructorPermission) + permission_name = permissions.VIEW_DASHBOARD + serializer_class = IssuedCertificateSerializer + + def get_queryset(self): + """ + Returns the queryset of issued certificates with allowlist and invalidation information. + """ + from lms.djangoapps.certificates.models import ( + GeneratedCertificate, + CertificateAllowlist, + CertificateInvalidation + ) + from lms.djangoapps.certificates.data import CertificateStatuses + from common.djangoapps.student.models import CourseEnrollment + + course_id = self.kwargs["course_id"] + course_key = CourseKey.from_string(course_id) + + # Validate that the course exists + get_course_by_id(course_key) + + # Get query parameters + search = self.request.query_params.get("search", "").lower() + filter_type = self.request.query_params.get("filter", "all") + + # Get certificates for the course + # Note: eligible_certificates manager excludes audit_passing/audit_notpassing by default + # For audit filters, we use objects manager to include them + if filter_type in ['audit_passing', 'audit_not_passing', 'all']: + # Use objects manager to include audit certificates when needed + certificates = GeneratedCertificate.objects.filter( + course_id=course_key + ).select_related('user', 'user__profile') + else: + # Use eligible_certificates for other filters (excludes audit certs) + certificates = GeneratedCertificate.eligible_certificates.filter( + course_id=course_key + ).select_related('user', 'user__profile') + + # Debug logging + import logging + log = logging.getLogger(__name__) + cert_count = certificates.count() + log.info(f"Certificate query for course {course_key}: found {cert_count} certificates") + log.info(f"Filter type: {filter_type}, Search: '{search}'") + if cert_count > 0: + for cert in list(certificates[:3]): + log.info(f" - User: {cert.user.username}, Status: {cert.status}, Mode: {cert.mode}") + + # Apply filter based on filter type + if filter_type == "received": + certificates = certificates.filter(status=CertificateStatuses.downloadable) + elif filter_type == "not_received": + certificates = certificates.filter( + status__in=[CertificateStatuses.notpassing, CertificateStatuses.unavailable] + ) + elif filter_type == "audit_passing": + certificates = certificates.filter(status=CertificateStatuses.audit_passing) + elif filter_type == "audit_not_passing": + certificates = certificates.filter(status=CertificateStatuses.audit_notpassing) + elif filter_type == "error": + certificates = certificates.filter(status=CertificateStatuses.error) + + # Get allowlist entries + allowlist_dict = {} + allowlist_entries = CertificateAllowlist.objects.filter( + course_id=course_key, + allowlist=True + ).select_related('user') + + for entry in allowlist_entries: + allowlist_dict[entry.user_id] = { + 'created': entry.created.strftime("%B %d, %Y"), + 'notes': entry.notes or '' + } + + # Get invalidation entries + invalidation_dict = {} + invalidations = CertificateInvalidation.objects.filter( + generated_certificate__course_id=course_key, + active=True + ).select_related('generated_certificate__user', 'invalidated_by') + + for inv in invalidations: + invalidation_dict[inv.generated_certificate.user_id] = { + 'invalidated_by': inv.invalidated_by.email, + 'created': inv.created.strftime("%B %d, %Y") + } + + # Get enrollment data + enrollments = CourseEnrollment.objects.filter( + course_id=course_key + ).select_related('user') + enrollment_dict = {e.user_id: e.mode for e in enrollments} + + # Build result list + results = [] + for cert in certificates: + user = cert.user + + # Apply search filter + if search and search not in user.username.lower() and search not in user.email.lower(): + continue + + allowlist_info = allowlist_dict.get(user.id) + invalidation_info = invalidation_dict.get(user.id) + + # Determine special case + special_case = None + if allowlist_info: + special_case = "Exception" + elif invalidation_info: + special_case = "Invalidation" + + # Apply additional filters + if filter_type == "granted_exceptions" and not allowlist_info: + continue + elif filter_type == "invalidated" and not invalidation_info: + continue + + results.append({ + 'username': user.username, + 'email': user.email, + 'enrollment_track': enrollment_dict.get(user.id, 'TBD'), + 'certificate_status': cert.status, + 'special_case': special_case, + 'exception_granted': allowlist_info['created'] if allowlist_info else None, + 'exception_notes': allowlist_info['notes'] if allowlist_info else None, + 'invalidated_by': invalidation_info['invalidated_by'] if invalidation_info else None, + 'invalidation_date': invalidation_info['created'] if invalidation_info else None, + }) + + # Sort by username + results.sort(key=lambda x: x['username']) + + return results + + +class CertificateGenerationHistoryView(ListAPIView): + """ + View to retrieve certificate generation history for a course. + + **Example Requests** + + GET /api/instructor/v2/courses/{course_id}/certificates/generation_history + GET /api/instructor/v2/courses/{course_id}/certificates/generation_history?page=2 + + **Response Values** + + { + "count": 25, + "next": "http://example.com/api/instructor/v2/courses/.../certificates/generation_history?page=2", + "previous": null, + "results": [ + { + "task_name": "Regenerated", + "date": "January 15, 2024", + "details": "audit not passing states" + }, + { + "task_name": "Generated", + "date": "January 10, 2024", + "details": "For exceptions" + }, + ... + ] + } + + **Parameters** + + course_id: Course key for the course + page (optional): Page number for pagination + page_size (optional): Number of results per page + + **Returns** + + * 200: OK - Returns paginated list of certificate generation history + * 401: Unauthorized - User is not authenticated + * 403: Forbidden - User lacks instructor permissions + * 404: Not Found - Course does not exist + """ + permission_classes = (IsAuthenticated, permissions.InstructorPermission) + permission_name = permissions.VIEW_DASHBOARD + serializer_class = CertificateGenerationHistorySerializer + + def get_queryset(self): + """ + Returns the queryset of certificate generation history. + """ + from lms.djangoapps.certificates.models import CertificateGenerationHistory + + course_id = self.kwargs["course_id"] + course_key = CourseKey.from_string(course_id) + + # Validate that the course exists + get_course_by_id(course_key) + + # Get generation history + history = CertificateGenerationHistory.objects.filter( + course_id=course_key + ).select_related('generated_by', 'instructor_task').order_by('-created') + + # Build result list + results = [] + for entry in history: + # Determine task name + task_name = "Regenerated" if entry.is_regeneration else "Generated" + + # Format date + date = entry.created.strftime("%B %d, %Y") + + # Get details about what was generated/regenerated + details = str(entry.get_certificate_generation_candidates()) + + results.append({ + 'task_name': task_name, + 'date': date, + 'details': details, + }) + + return results + + +@method_decorator(cache_control(no_cache=True, no_store=True, must_revalidate=True), name='dispatch') +@method_decorator(transaction.non_atomic_requests, name='dispatch') +class RegenerateCertificatesView(DeveloperErrorViewMixin, APIView): + """ + View to regenerate certificates for a course. + + **Use Cases** + + Regenerate certificates for learners in a course, optionally filtering by certificate status + or student set (all learners or allowlisted learners). + + **Example Requests** + + POST /api/instructor/v2/courses/{course_id}/certificates/regenerate + + Request Body: + { + "statuses": ["downloadable", "notpassing"], + "student_set": "all" + } + + **Request Body Parameters** + + statuses (optional): List of certificate statuses to regenerate + student_set (optional): "all" for all learners, "allowlisted" for allowlisted learners only + + **Response Values** + + { + "task_id": "abc-123", + "message": "Certificate regeneration task has been started" + } + + **Returns** + + * 200: OK - Certificate regeneration task started successfully + * 400: Bad Request - Invalid parameters + * 401: Unauthorized - User is not authenticated + * 403: Forbidden - User lacks instructor permissions + * 404: Not Found - Course does not exist + """ + permission_classes = (IsAuthenticated, permissions.InstructorPermission) + permission_name = permissions.GIVE_STUDENT_EXTENSION + + @apidocs.schema( + parameters=[ + apidocs.string_parameter( + 'course_id', + apidocs.ParameterLocation.PATH, + description="Course key for the course.", + ), + ], + body=RegenerateCertificatesSerializer, + responses={ + 200: "Certificate regeneration task started successfully", + 400: "Invalid parameters provided.", + 401: "The requesting user is not authenticated.", + 403: "The requesting user lacks instructor access to the course.", + 404: "The requested course does not exist.", + }, + ) + def post(self, request, course_id): + """ + Initiate certificate regeneration for a course. + """ + from lms.djangoapps.instructor_task import api as task_api + + course_key = CourseKey.from_string(course_id) + course = get_course_by_id(course_key) + + serializer = RegenerateCertificatesSerializer(data=request.data) + if not serializer.is_valid(): + return Response( + {'error': serializer.errors}, + status=status.HTTP_400_BAD_REQUEST + ) + + statuses = serializer.validated_data.get('statuses', []) + student_set = serializer.validated_data.get('student_set', 'all') + + try: + # Submit certificate generation/regeneration task + if student_set == 'allowlisted': + # Generate for allowlisted students only + task = task_api.generate_certificates_for_students( + request, + course_key, + student_set='all_allowlisted' + ) + elif statuses: + # Regenerate for specified statuses + task = task_api.regenerate_certificates( + request, + course_key, + statuses_to_regenerate=statuses + ) + else: + # Generate for all students + task = task_api.generate_certificates_for_students( + request, + course_key + ) + + return Response({ + 'task_id': task.task_id, + 'message': _('Certificate regeneration task has been started') + }, status=status.HTTP_200_OK) + + except Exception as exc: + log.error(f"Error starting certificate regeneration: {exc}") + return Response( + {'error': str(exc)}, + status=status.HTTP_400_BAD_REQUEST + ) + + +class CertificateConfigView(DeveloperErrorViewMixin, APIView): + """ + View to retrieve certificate configuration for a course. + + **Use Cases** + + Check if certificate generation is enabled for the platform and validate course existence. + + **Example Requests** + + GET /api/instructor/v2/courses/{course_id}/certificates/config + + **Response Values** + + { + "enabled": true + } + + **Returns** + + * 200: OK - Returns certificate configuration + * 401: Unauthorized - User is not authenticated + * 403: Forbidden - User lacks instructor permissions + * 404: Not Found - Course does not exist + """ + permission_classes = (IsAuthenticated, permissions.InstructorPermission) + permission_name = permissions.VIEW_DASHBOARD + + @apidocs.schema( + parameters=[ + apidocs.string_parameter( + 'course_id', + apidocs.ParameterLocation.PATH, + description="Course key for the course.", + ), + ], + responses={ + 200: "Returns certificate configuration.", + 401: "The requesting user is not authenticated.", + 403: "The requesting user lacks instructor access to the course.", + 404: "The requested course does not exist.", + }, + ) + def get(self, request, course_id): + """ + Retrieve certificate configuration. + """ + from lms.djangoapps.certificates import api as certs_api + + course_key = CourseKey.from_string(course_id) + # Validate that the course exists + get_course_by_id(course_key) + + # Check if certificate generation is enabled + enabled = certs_api.is_certificate_generation_enabled() + + return Response({'enabled': enabled}, status=status.HTTP_200_OK) diff --git a/lms/djangoapps/instructor/views/serializers_v2.py b/lms/djangoapps/instructor/views/serializers_v2.py index bb09dc5111f9..77a37331b878 100644 --- a/lms/djangoapps/instructor/views/serializers_v2.py +++ b/lms/djangoapps/instructor/views/serializers_v2.py @@ -476,3 +476,57 @@ class ORASummarySerializer(serializers.Serializer): waiting = serializers.IntegerField() staff = serializers.IntegerField() final_grade_received = serializers.IntegerField() + + +class IssuedCertificateSerializer(serializers.Serializer): + """ + Serializer for issued certificates with allowlist and invalidation information. + """ + username = serializers.CharField(help_text="Username of the learner") + email = serializers.EmailField(help_text="Email address of the learner") + enrollment_track = serializers.CharField(help_text="Enrollment track/mode (e.g., verified, audit)") + certificate_status = serializers.CharField(help_text="Certificate status (e.g., downloadable, notpassing)") + special_case = serializers.CharField( + allow_null=True, + help_text="Special case type (Exception or Invalidation)" + ) + exception_granted = serializers.CharField( + allow_null=True, + help_text="Date when exception was granted" + ) + exception_notes = serializers.CharField( + allow_null=True, + help_text="Notes about the exception" + ) + invalidated_by = serializers.CharField( + allow_null=True, + help_text="Email of user who invalidated the certificate" + ) + invalidation_date = serializers.CharField( + allow_null=True, + help_text="Date when certificate was invalidated" + ) + + +class CertificateGenerationHistorySerializer(serializers.Serializer): + """ + Serializer for certificate generation history. + """ + task_name = serializers.CharField(help_text="Task name (Generated or Regenerated)") + date = serializers.CharField(help_text="Date when the task was created (formatted)") + details = serializers.CharField(help_text="Details about the certificate generation (e.g., 'audit not passing states', 'For exceptions')") + + +class RegenerateCertificatesSerializer(serializers.Serializer): + """ + Serializer for regenerating certificates request. + """ + statuses = serializers.ListField( + child=serializers.CharField(), + required=False, + help_text="Certificate statuses to regenerate" + ) + student_set = serializers.CharField( + required=False, + help_text="Student set filter (e.g., 'all', 'allowlisted')" + ) diff --git a/lms/envs/devstack.py b/lms/envs/devstack.py index dd7097df7178..9a685f35b07f 100644 --- a/lms/envs/devstack.py +++ b/lms/envs/devstack.py @@ -403,6 +403,9 @@ def should_show_debug_toolbar(request): # lint-amnesty, pylint: disable=missing ################### FRONTEND APPLICATION DISCUSSIONS ################### DISCUSSIONS_MICROFRONTEND_URL = 'http://localhost:2002' +################### FRONTEND APPLICATION INSTRUCTOR ################### +INSTRUCTOR_MICROFRONTEND_URL = 'http://apps.local.openedx.io:2003' + ################### FRONTEND APPLICATION DISCUSSIONS FEEDBACK URL################### DISCUSSIONS_MFE_FEEDBACK_URL = None diff --git a/lms/urls.py b/lms/urls.py index 25c5a04f78cd..ea1f9a3bde7a 100644 --- a/lms/urls.py +++ b/lms/urls.py @@ -1053,6 +1053,11 @@ path('api/instructor_task/', include('lms.djangoapps.instructor_task.rest_api.urls')), ] +# Instructor API URLs +urlpatterns += [ + path('', include('lms.djangoapps.instructor.urls')), +] + # MFE API urls urlpatterns += [ path('api/mfe_config/v1', include(('lms.djangoapps.mfe_config_api.urls', 'lms.djangoapps.mfe_config_api'), namespace='mfe_config_api')) From 3faa81c6a71124de0b61e4410b708e2b9cb55beb Mon Sep 17 00:00:00 2001 From: Jesse Stewart Date: Mon, 16 Mar 2026 10:37:31 -0400 Subject: [PATCH 2/4] fix: linting --- lms/djangoapps/instructor/views/api_v2.py | 152 +++++++++--------- .../instructor/views/serializers_v2.py | 4 +- 2 files changed, 83 insertions(+), 73 deletions(-) diff --git a/lms/djangoapps/instructor/views/api_v2.py b/lms/djangoapps/instructor/views/api_v2.py index 28bc0d79a9df..545a099092a6 100644 --- a/lms/djangoapps/instructor/views/api_v2.py +++ b/lms/djangoapps/instructor/views/api_v2.py @@ -34,13 +34,6 @@ from xmodule.modulestore.django import modulestore from xmodule.modulestore.exceptions import ItemNotFoundError -from rest_framework.generics import GenericAPIView -from rest_framework.exceptions import NotFound -from django.db import transaction -from django.utils.decorators import method_decorator -from django.views.decorators.cache import cache_control -from django.utils.html import strip_tags -from django.utils.translation import gettext as _ from common.djangoapps.util.json_request import JsonResponseBadRequest from openedx.core.djangoapps.course_groups.cohorts import is_course_cohorted from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers @@ -1169,6 +1162,77 @@ class IssuedCertificatesView(ListAPIView): permission_name = permissions.VIEW_DASHBOARD serializer_class = IssuedCertificateSerializer + def _apply_certificate_status_filter(self, certificates, filter_type, CertificateStatuses): + """Apply status-based filters to certificate queryset.""" + if filter_type == "received": + return certificates.filter(status=CertificateStatuses.downloadable) + elif filter_type == "not_received": + return certificates.filter( + status__in=[CertificateStatuses.notpassing, CertificateStatuses.unavailable] + ) + elif filter_type == "audit_passing": + return certificates.filter(status=CertificateStatuses.audit_passing) + elif filter_type == "audit_not_passing": + return certificates.filter(status=CertificateStatuses.audit_notpassing) + elif filter_type == "error": + return certificates.filter(status=CertificateStatuses.error) + return certificates + + def _get_allowlist_dict(self, course_key, CertificateAllowlist): + """Get allowlist entries as a dictionary keyed by user_id.""" + allowlist_dict = {} + allowlist_entries = CertificateAllowlist.objects.filter( + course_id=course_key, + allowlist=True + ).select_related('user') + + for entry in allowlist_entries: + allowlist_dict[entry.user_id] = { + 'created': entry.created.strftime("%B %d, %Y"), + 'notes': entry.notes or '' + } + return allowlist_dict + + def _get_invalidation_dict(self, course_key, CertificateInvalidation): + """Get invalidation entries as a dictionary keyed by user_id.""" + invalidation_dict = {} + invalidations = CertificateInvalidation.objects.filter( + generated_certificate__course_id=course_key, + active=True + ).select_related('generated_certificate__user', 'invalidated_by') + + for inv in invalidations: + invalidation_dict[inv.generated_certificate.user_id] = { + 'invalidated_by': inv.invalidated_by.email, + 'created': inv.created.strftime("%B %d, %Y") + } + return invalidation_dict + + def _build_certificate_result(self, cert, enrollment_dict, allowlist_dict, invalidation_dict): + """Build result dictionary for a single certificate.""" + user = cert.user + allowlist_info = allowlist_dict.get(user.id) + invalidation_info = invalidation_dict.get(user.id) + + # Determine special case + special_case = None + if allowlist_info: + special_case = "Exception" + elif invalidation_info: + special_case = "Invalidation" + + return { + 'username': user.username, + 'email': user.email, + 'enrollment_track': enrollment_dict.get(user.id, 'TBD'), + 'certificate_status': cert.status, + 'special_case': special_case, + 'exception_granted': allowlist_info['created'] if allowlist_info else None, + 'exception_notes': allowlist_info['notes'] if allowlist_info else None, + 'invalidated_by': invalidation_info['invalidated_by'] if invalidation_info else None, + 'invalidation_date': invalidation_info['created'] if invalidation_info else None, + } + def get_queryset(self): """ Returns the queryset of issued certificates with allowlist and invalidation information. @@ -1192,22 +1256,16 @@ def get_queryset(self): filter_type = self.request.query_params.get("filter", "all") # Get certificates for the course - # Note: eligible_certificates manager excludes audit_passing/audit_notpassing by default - # For audit filters, we use objects manager to include them if filter_type in ['audit_passing', 'audit_not_passing', 'all']: - # Use objects manager to include audit certificates when needed certificates = GeneratedCertificate.objects.filter( course_id=course_key ).select_related('user', 'user__profile') else: - # Use eligible_certificates for other filters (excludes audit certs) certificates = GeneratedCertificate.eligible_certificates.filter( course_id=course_key ).select_related('user', 'user__profile') # Debug logging - import logging - log = logging.getLogger(__name__) cert_count = certificates.count() log.info(f"Certificate query for course {course_key}: found {cert_count} certificates") log.info(f"Filter type: {filter_type}, Search: '{search}'") @@ -1216,44 +1274,11 @@ def get_queryset(self): log.info(f" - User: {cert.user.username}, Status: {cert.status}, Mode: {cert.mode}") # Apply filter based on filter type - if filter_type == "received": - certificates = certificates.filter(status=CertificateStatuses.downloadable) - elif filter_type == "not_received": - certificates = certificates.filter( - status__in=[CertificateStatuses.notpassing, CertificateStatuses.unavailable] - ) - elif filter_type == "audit_passing": - certificates = certificates.filter(status=CertificateStatuses.audit_passing) - elif filter_type == "audit_not_passing": - certificates = certificates.filter(status=CertificateStatuses.audit_notpassing) - elif filter_type == "error": - certificates = certificates.filter(status=CertificateStatuses.error) - - # Get allowlist entries - allowlist_dict = {} - allowlist_entries = CertificateAllowlist.objects.filter( - course_id=course_key, - allowlist=True - ).select_related('user') - - for entry in allowlist_entries: - allowlist_dict[entry.user_id] = { - 'created': entry.created.strftime("%B %d, %Y"), - 'notes': entry.notes or '' - } - - # Get invalidation entries - invalidation_dict = {} - invalidations = CertificateInvalidation.objects.filter( - generated_certificate__course_id=course_key, - active=True - ).select_related('generated_certificate__user', 'invalidated_by') + certificates = self._apply_certificate_status_filter(certificates, filter_type, CertificateStatuses) - for inv in invalidations: - invalidation_dict[inv.generated_certificate.user_id] = { - 'invalidated_by': inv.invalidated_by.email, - 'created': inv.created.strftime("%B %d, %Y") - } + # Get related data + allowlist_dict = self._get_allowlist_dict(course_key, CertificateAllowlist) + invalidation_dict = self._get_invalidation_dict(course_key, CertificateInvalidation) # Get enrollment data enrollments = CourseEnrollment.objects.filter( @@ -1270,33 +1295,18 @@ def get_queryset(self): if search and search not in user.username.lower() and search not in user.email.lower(): continue + # Apply special case filters allowlist_info = allowlist_dict.get(user.id) invalidation_info = invalidation_dict.get(user.id) - # Determine special case - special_case = None - if allowlist_info: - special_case = "Exception" - elif invalidation_info: - special_case = "Invalidation" - - # Apply additional filters if filter_type == "granted_exceptions" and not allowlist_info: continue elif filter_type == "invalidated" and not invalidation_info: continue - results.append({ - 'username': user.username, - 'email': user.email, - 'enrollment_track': enrollment_dict.get(user.id, 'TBD'), - 'certificate_status': cert.status, - 'special_case': special_case, - 'exception_granted': allowlist_info['created'] if allowlist_info else None, - 'exception_notes': allowlist_info['notes'] if allowlist_info else None, - 'invalidated_by': invalidation_info['invalidated_by'] if invalidation_info else None, - 'invalidation_date': invalidation_info['created'] if invalidation_info else None, - }) + results.append(self._build_certificate_result( + cert, enrollment_dict, allowlist_dict, invalidation_dict + )) # Sort by username results.sort(key=lambda x: x['username']) @@ -1454,8 +1464,6 @@ def post(self, request, course_id): """ Initiate certificate regeneration for a course. """ - from lms.djangoapps.instructor_task import api as task_api - course_key = CourseKey.from_string(course_id) course = get_course_by_id(course_key) @@ -1497,7 +1505,7 @@ def post(self, request, course_id): 'message': _('Certificate regeneration task has been started') }, status=status.HTTP_200_OK) - except Exception as exc: + except (AlreadyRunningError, QueueConnectionError) as exc: log.error(f"Error starting certificate regeneration: {exc}") return Response( {'error': str(exc)}, diff --git a/lms/djangoapps/instructor/views/serializers_v2.py b/lms/djangoapps/instructor/views/serializers_v2.py index 77a37331b878..ab13b02b49c5 100644 --- a/lms/djangoapps/instructor/views/serializers_v2.py +++ b/lms/djangoapps/instructor/views/serializers_v2.py @@ -514,7 +514,9 @@ class CertificateGenerationHistorySerializer(serializers.Serializer): """ task_name = serializers.CharField(help_text="Task name (Generated or Regenerated)") date = serializers.CharField(help_text="Date when the task was created (formatted)") - details = serializers.CharField(help_text="Details about the certificate generation (e.g., 'audit not passing states', 'For exceptions')") + details = serializers.CharField( + help_text="Details about the certificate generation (e.g., 'audit not passing states', 'For exceptions')" + ) class RegenerateCertificatesSerializer(serializers.Serializer): From 9519ba0abe6a0908f3016309e5bc906a2536d247 Mon Sep 17 00:00:00 2001 From: Jesse Stewart Date: Wed, 18 Mar 2026 16:59:56 -0400 Subject: [PATCH 3/4] fix: PR feedback --- .../instructor/tests/test_api_v2.py | 250 ++++++++++++++++++ lms/djangoapps/instructor/views/api_v2.py | 136 +++++++--- .../instructor/views/serializers_v2.py | 36 ++- 3 files changed, 367 insertions(+), 55 deletions(-) diff --git a/lms/djangoapps/instructor/tests/test_api_v2.py b/lms/djangoapps/instructor/tests/test_api_v2.py index 61797e1e9354..3bcb69a4c445 100644 --- a/lms/djangoapps/instructor/tests/test_api_v2.py +++ b/lms/djangoapps/instructor/tests/test_api_v2.py @@ -1734,3 +1734,253 @@ def test_extension_data_structure(self, mock_title_or_url, mock_get_units, mock_ self.assertIsInstance(extension['email'], str) self.assertIsInstance(extension['unit_title'], str) self.assertIsInstance(extension['unit_location'], str) + + +@ddt.ddt +class IssuedCertificatesViewTest(SharedModuleStoreTestCase): + """ + Tests for the IssuedCertificatesView API endpoint. + """ + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.course = CourseFactory.create( + org='edX', + number='TestX', + run='Test_Course', + display_name='Test Course', + ) + cls.course_key = cls.course.id + + def setUp(self): + super().setUp() + self.client = APIClient() + self.instructor = InstructorFactory.create(course_key=self.course_key) + self.staff = StaffFactory.create(course_key=self.course_key) + self.student1 = UserFactory.create(username='student1', email='student1@example.com') + self.student2 = UserFactory.create(username='student2', email='student2@example.com') + + # Enroll students + CourseEnrollmentFactory.create( + user=self.student1, + course_id=self.course_key, + mode='verified', + is_active=True + ) + CourseEnrollmentFactory.create( + user=self.student2, + course_id=self.course_key, + mode='audit', + is_active=True + ) + + def _get_url(self, course_id=None): + """Helper to get the API URL.""" + if course_id is None: + course_id = str(self.course_key) + return reverse('instructor_api_v2:issued_certificates', kwargs={'course_id': course_id}) + + def test_get_issued_certificates_as_staff(self): + """ + Test that staff can retrieve issued certificates. + """ + self.client.force_authenticate(user=self.staff) + response = self.client.get(self._get_url()) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn('results', response.data) + self.assertIn('count', response.data) + + def test_get_issued_certificates_unauthorized(self): + """ + Test that students cannot access issued certificates endpoint. + """ + self.client.force_authenticate(user=self.student1) + response = self.client.get(self._get_url()) + + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_get_issued_certificates_unauthenticated(self): + """ + Test that unauthenticated users cannot access the endpoint. + """ + response = self.client.get(self._get_url()) + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_get_issued_certificates_nonexistent_course(self): + """ + Test error handling for non-existent course. + """ + self.client.force_authenticate(user=self.instructor) + nonexistent_course_id = 'course-v1:edX+NonExistent+2024' + response = self.client.get(self._get_url(course_id=nonexistent_course_id)) + + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + @patch('lms.djangoapps.instructor.views.api_v2.GeneratedCertificate.objects.filter') + def test_search_filter(self, mock_filter): + """ + Test filtering certificates by search term. + """ + # Mock queryset methods + mock_queryset = Mock() + mock_queryset.select_related.return_value = mock_queryset + mock_queryset.__iter__ = Mock(return_value=iter([])) + mock_filter.return_value = mock_queryset + + self.client.force_authenticate(user=self.instructor) + params = {'search': 'student1'} + response = self.client.get(self._get_url(), params) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + @ddt.data( + 'received', + 'not_received', + 'audit_passing', + 'audit_not_passing', + 'error', + 'granted_exceptions', + 'invalidated', + ) + def test_filter_types(self, filter_type): + """ + Test various filter types for certificates. + """ + self.client.force_authenticate(user=self.instructor) + params = {'filter': filter_type} + response = self.client.get(self._get_url(), params) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn('results', response.data) + + def test_pagination(self): + """ + Test pagination parameters work correctly. + """ + self.client.force_authenticate(user=self.instructor) + params = {'page': '1', 'page_size': '10'} + response = self.client.get(self._get_url(), params) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn('count', response.data) + self.assertIn('next', response.data) + self.assertIn('previous', response.data) + self.assertIn('results', response.data) + + +@ddt.ddt +class CertificateGenerationHistoryViewTest(SharedModuleStoreTestCase): + """ + Tests for the CertificateGenerationHistoryView API endpoint. + """ + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.course = CourseFactory.create( + org='edX', + number='TestX', + run='Test_Course', + display_name='Test Course', + ) + cls.course_key = cls.course.id + + def setUp(self): + super().setUp() + self.client = APIClient() + self.instructor = InstructorFactory.create(course_key=self.course_key) + self.staff = StaffFactory.create(course_key=self.course_key) + self.student = UserFactory.create() + + def _get_url(self, course_id=None): + """Helper to get the API URL.""" + if course_id is None: + course_id = str(self.course_key) + return reverse('instructor_api_v2:certificate_generation_history', kwargs={'course_id': course_id}) + + def test_get_generation_history_as_staff(self): + """ + Test that staff can retrieve certificate generation history. + """ + self.client.force_authenticate(user=self.staff) + response = self.client.get(self._get_url()) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn('results', response.data) + self.assertIn('count', response.data) + + def test_get_generation_history_unauthorized(self): + """ + Test that students cannot access generation history endpoint. + """ + self.client.force_authenticate(user=self.student) + response = self.client.get(self._get_url()) + + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_get_generation_history_unauthenticated(self): + """ + Test that unauthenticated users cannot access the endpoint. + """ + response = self.client.get(self._get_url()) + self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + + def test_get_generation_history_nonexistent_course(self): + """ + Test error handling for non-existent course. + """ + self.client.force_authenticate(user=self.instructor) + nonexistent_course_id = 'course-v1:edX+NonExistent+2024' + response = self.client.get(self._get_url(course_id=nonexistent_course_id)) + + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + def test_pagination(self): + """ + Test pagination parameters work correctly. + """ + self.client.force_authenticate(user=self.instructor) + params = {'page': '1', 'page_size': '10'} + response = self.client.get(self._get_url(), params) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn('count', response.data) + self.assertIn('next', response.data) + self.assertIn('previous', response.data) + self.assertIn('results', response.data) + + @patch('lms.djangoapps.instructor.views.api_v2.CertificateGenerationHistory.objects.filter') + def test_history_entry_structure(self, mock_filter): + """ + Test that history entries have the correct structure. + """ + # Mock history entry + mock_entry = Mock() + mock_entry.is_regeneration = True + mock_entry.created = datetime(2024, 1, 15, 10, 30, 0, tzinfo=UTC) + mock_entry.get_certificate_generation_candidates.return_value = "audit not passing states" + + mock_queryset = Mock() + mock_queryset.select_related.return_value = mock_queryset + mock_queryset.order_by.return_value = [mock_entry] + mock_filter.return_value = mock_queryset + + self.client.force_authenticate(user=self.instructor) + response = self.client.get(self._get_url()) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + results = response.data['results'] + + if results: + entry = results[0] + # Verify all required fields are present + self.assertIn('task_name', entry) + self.assertIn('date', entry) + self.assertIn('details', entry) + + # Verify data types + self.assertIsInstance(entry['task_name'], str) + self.assertIsInstance(entry['date'], str) + self.assertIsInstance(entry['details'], str) diff --git a/lms/djangoapps/instructor/views/api_v2.py b/lms/djangoapps/instructor/views/api_v2.py index 545a099092a6..c7b1d8c47a94 100644 --- a/lms/djangoapps/instructor/views/api_v2.py +++ b/lms/djangoapps/instructor/views/api_v2.py @@ -17,6 +17,7 @@ import edx_api_doc_tools as apidocs from django.conf import settings from django.db import transaction +from django.db.models import Q from django.utils.decorators import method_decorator from django.utils.html import strip_tags from django.utils.translation import gettext as _ @@ -52,6 +53,15 @@ from lms.djangoapps.instructor_task.tasks_helper.utils import upload_csv_file_to_report_store from openedx.core.lib.api.view_utils import DeveloperErrorViewMixin from openedx.core.lib.courses import get_course_by_id +from common.djangoapps.student.models import CourseEnrollment +from lms.djangoapps.certificates.models import ( + GeneratedCertificate, + CertificateAllowlist, + CertificateInvalidation, + CertificateGenerationHistory, +) +from lms.djangoapps.certificates.data import CertificateStatuses +from lms.djangoapps.certificates import api as certs_api from .serializers_v2 import ( InstructorTaskListSerializer, CourseInformationSerializerV2, @@ -1188,7 +1198,7 @@ def _get_allowlist_dict(self, course_key, CertificateAllowlist): for entry in allowlist_entries: allowlist_dict[entry.user_id] = { - 'created': entry.created.strftime("%B %d, %Y"), + 'created': entry.created.isoformat(), 'notes': entry.notes or '' } return allowlist_dict @@ -1204,7 +1214,7 @@ def _get_invalidation_dict(self, course_key, CertificateInvalidation): for inv in invalidations: invalidation_dict[inv.generated_certificate.user_id] = { 'invalidated_by': inv.invalidated_by.email, - 'created': inv.created.strftime("%B %d, %Y") + 'created': inv.created.isoformat() } return invalidation_dict @@ -1235,16 +1245,11 @@ def _build_certificate_result(self, cert, enrollment_dict, allowlist_dict, inval def get_queryset(self): """ - Returns the queryset of issued certificates with allowlist and invalidation information. - """ - from lms.djangoapps.certificates.models import ( - GeneratedCertificate, - CertificateAllowlist, - CertificateInvalidation - ) - from lms.djangoapps.certificates.data import CertificateStatuses - from common.djangoapps.student.models import CourseEnrollment + Returns the queryset of issued certificates for the course. + This method returns a Django QuerySet that will be further processed + by the list() method to build the final response data. + """ course_id = self.kwargs["course_id"] course_key = CourseKey.from_string(course_id) @@ -1252,8 +1257,8 @@ def get_queryset(self): get_course_by_id(course_key) # Get query parameters - search = self.request.query_params.get("search", "").lower() filter_type = self.request.query_params.get("filter", "all") + search = self.request.query_params.get("search", "").strip() # Get certificates for the course if filter_type in ['audit_passing', 'audit_not_passing', 'all']: @@ -1265,17 +1270,33 @@ def get_queryset(self): course_id=course_key ).select_related('user', 'user__profile') + # Apply search filter at database level + if search: + certificates = certificates.filter( + Q(user__username__icontains=search) | Q(user__email__icontains=search) + ) + # Debug logging - cert_count = certificates.count() - log.info(f"Certificate query for course {course_key}: found {cert_count} certificates") - log.info(f"Filter type: {filter_type}, Search: '{search}'") - if cert_count > 0: - for cert in list(certificates[:3]): - log.info(f" - User: {cert.user.username}, Status: {cert.status}, Mode: {cert.mode}") + log.debug(f"Certificate query for course {course_key}: found {certificates.count()} certificates, filter_type: {filter_type}") # Apply filter based on filter type certificates = self._apply_certificate_status_filter(certificates, filter_type, CertificateStatuses) + return certificates + + def list(self, request, *args, **kwargs): + """ + Override list method to process certificates and return paginated results. + """ + course_id = self.kwargs["course_id"] + course_key = CourseKey.from_string(course_id) + + # Get the certificate queryset + queryset = self.filter_queryset(self.get_queryset()) + + # Get query parameters + filter_type = self.request.query_params.get("filter", "all") + # Get related data allowlist_dict = self._get_allowlist_dict(course_key, CertificateAllowlist) invalidation_dict = self._get_invalidation_dict(course_key, CertificateInvalidation) @@ -1288,14 +1309,10 @@ def get_queryset(self): # Build result list results = [] - for cert in certificates: + for cert in queryset: user = cert.user - # Apply search filter - if search and search not in user.username.lower() and search not in user.email.lower(): - continue - - # Apply special case filters + # Apply special case filters that can't be done at database level allowlist_info = allowlist_dict.get(user.id) invalidation_info = invalidation_dict.get(user.id) @@ -1311,7 +1328,14 @@ def get_queryset(self): # Sort by username results.sort(key=lambda x: x['username']) - return results + # Apply pagination + page = self.paginate_queryset(results) + if page is not None: + serializer = self.get_serializer(page, many=True) + return self.get_paginated_response(serializer.data) + + serializer = self.get_serializer(results, many=True) + return Response(serializer.data) class CertificateGenerationHistoryView(ListAPIView): @@ -1364,9 +1388,10 @@ class CertificateGenerationHistoryView(ListAPIView): def get_queryset(self): """ Returns the queryset of certificate generation history. - """ - from lms.djangoapps.certificates.models import CertificateGenerationHistory + This method returns a Django QuerySet that will be further processed + by the list() method to build the final response data. + """ course_id = self.kwargs["course_id"] course_key = CourseKey.from_string(course_id) @@ -1378,25 +1403,50 @@ def get_queryset(self): course_id=course_key ).select_related('generated_by', 'instructor_task').order_by('-created') - # Build result list - results = [] - for entry in history: - # Determine task name - task_name = "Regenerated" if entry.is_regeneration else "Generated" + return history - # Format date - date = entry.created.strftime("%B %d, %Y") + def _process_history_entry(self, entry): + """ + Process a single certificate generation history entry into a result dictionary. - # Get details about what was generated/regenerated - details = str(entry.get_certificate_generation_candidates()) + Args: + entry: CertificateGenerationHistory instance - results.append({ - 'task_name': task_name, - 'date': date, - 'details': details, - }) + Returns: + dict: Processed history entry with task_name, date (ISO 8601), and details + """ + # Determine task name + task_name = "Regenerated" if entry.is_regeneration else "Generated" - return results + # Format date in ISO 8601 format for frontend to handle display formatting + date = entry.created.isoformat() + + # Get details about what was generated/regenerated + details = str(entry.get_certificate_generation_candidates()) + + return { + 'task_name': task_name, + 'date': date, + 'details': details, + } + + def list(self, request, *args, **kwargs): + """ + Override list method to process history entries and return paginated results. + """ + queryset = self.filter_queryset(self.get_queryset()) + + # Build result list + results = [self._process_history_entry(entry) for entry in queryset] + + # Apply pagination + page = self.paginate_queryset(results) + if page is not None: + serializer = self.get_serializer(page, many=True) + return self.get_paginated_response(serializer.data) + + serializer = self.get_serializer(results, many=True) + return Response(serializer.data) @method_decorator(cache_control(no_cache=True, no_store=True, must_revalidate=True), name='dispatch') @@ -1560,8 +1610,6 @@ def get(self, request, course_id): """ Retrieve certificate configuration. """ - from lms.djangoapps.certificates import api as certs_api - course_key = CourseKey.from_string(course_id) # Validate that the course exists get_course_by_id(course_key) diff --git a/lms/djangoapps/instructor/views/serializers_v2.py b/lms/djangoapps/instructor/views/serializers_v2.py index fe925ad4f9d1..ea4996d917ab 100644 --- a/lms/djangoapps/instructor/views/serializers_v2.py +++ b/lms/djangoapps/instructor/views/serializers_v2.py @@ -495,27 +495,38 @@ class IssuedCertificateSerializer(serializers.Serializer): """ username = serializers.CharField(help_text="Username of the learner") email = serializers.EmailField(help_text="Email address of the learner") - enrollment_track = serializers.CharField(help_text="Enrollment track/mode (e.g., verified, audit)") - certificate_status = serializers.CharField(help_text="Certificate status (e.g., downloadable, notpassing)") - special_case = serializers.CharField( + enrollmentTrack = serializers.CharField( + source='enrollment_track', + help_text="Enrollment track/mode (e.g., verified, audit)" + ) + certificateStatus = serializers.CharField( + source='certificate_status', + help_text="Certificate status (e.g., downloadable, notpassing)" + ) + specialCase = serializers.CharField( + source='special_case', allow_null=True, help_text="Special case type (Exception or Invalidation)" ) - exception_granted = serializers.CharField( + exceptionGranted = serializers.CharField( + source='exception_granted', allow_null=True, - help_text="Date when exception was granted" + help_text="Date when exception was granted in ISO 8601 format" ) - exception_notes = serializers.CharField( + exceptionNotes = serializers.CharField( + source='exception_notes', allow_null=True, help_text="Notes about the exception" ) - invalidated_by = serializers.CharField( + invalidatedBy = serializers.CharField( + source='invalidated_by', allow_null=True, help_text="Email of user who invalidated the certificate" ) - invalidation_date = serializers.CharField( + invalidationDate = serializers.CharField( + source='invalidation_date', allow_null=True, - help_text="Date when certificate was invalidated" + help_text="Date when certificate was invalidated in ISO 8601 format" ) @@ -523,8 +534,11 @@ class CertificateGenerationHistorySerializer(serializers.Serializer): """ Serializer for certificate generation history. """ - task_name = serializers.CharField(help_text="Task name (Generated or Regenerated)") - date = serializers.CharField(help_text="Date when the task was created (formatted)") + taskName = serializers.CharField( + source='task_name', + help_text="Task name (Generated or Regenerated)" + ) + date = serializers.CharField(help_text="Date when the task was created in ISO 8601 format") details = serializers.CharField( help_text="Details about the certificate generation (e.g., 'audit not passing states', 'For exceptions')" ) From 13584fb824e22c2f314fe1c5e605ad38ee198ea8 Mon Sep 17 00:00:00 2001 From: Jesse Stewart Date: Wed, 18 Mar 2026 19:15:39 -0400 Subject: [PATCH 4/4] feat: fixes linting --- .../instructor/tests/test_api_v2.py | 10 +++++--- lms/djangoapps/instructor/views/api_v2.py | 25 +++++++++++-------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/lms/djangoapps/instructor/tests/test_api_v2.py b/lms/djangoapps/instructor/tests/test_api_v2.py index 3bcb69a4c445..5ef3011abfbc 100644 --- a/lms/djangoapps/instructor/tests/test_api_v2.py +++ b/lms/djangoapps/instructor/tests/test_api_v2.py @@ -1823,9 +1823,11 @@ def test_search_filter(self, mock_filter): """ Test filtering certificates by search term. """ - # Mock queryset methods + # Mock queryset methods - must be fully iterable mock_queryset = Mock() mock_queryset.select_related.return_value = mock_queryset + mock_queryset.filter.return_value = mock_queryset + mock_queryset.count.return_value = 0 mock_queryset.__iter__ = Mock(return_value=iter([])) mock_filter.return_value = mock_queryset @@ -1975,12 +1977,12 @@ def test_history_entry_structure(self, mock_filter): if results: entry = results[0] - # Verify all required fields are present - self.assertIn('task_name', entry) + # Verify all required fields are present (camelCase from serializer) + self.assertIn('taskName', entry) self.assertIn('date', entry) self.assertIn('details', entry) # Verify data types - self.assertIsInstance(entry['task_name'], str) + self.assertIsInstance(entry['taskName'], str) self.assertIsInstance(entry['date'], str) self.assertIsInstance(entry['details'], str) diff --git a/lms/djangoapps/instructor/views/api_v2.py b/lms/djangoapps/instructor/views/api_v2.py index c7b1d8c47a94..4c3e340e2145 100644 --- a/lms/djangoapps/instructor/views/api_v2.py +++ b/lms/djangoapps/instructor/views/api_v2.py @@ -1172,26 +1172,26 @@ class IssuedCertificatesView(ListAPIView): permission_name = permissions.VIEW_DASHBOARD serializer_class = IssuedCertificateSerializer - def _apply_certificate_status_filter(self, certificates, filter_type, CertificateStatuses): + def _apply_certificate_status_filter(self, certificates, filter_type, cert_statuses): """Apply status-based filters to certificate queryset.""" if filter_type == "received": - return certificates.filter(status=CertificateStatuses.downloadable) + return certificates.filter(status=cert_statuses.downloadable) elif filter_type == "not_received": return certificates.filter( - status__in=[CertificateStatuses.notpassing, CertificateStatuses.unavailable] + status__in=[cert_statuses.notpassing, cert_statuses.unavailable] ) elif filter_type == "audit_passing": - return certificates.filter(status=CertificateStatuses.audit_passing) + return certificates.filter(status=cert_statuses.audit_passing) elif filter_type == "audit_not_passing": - return certificates.filter(status=CertificateStatuses.audit_notpassing) + return certificates.filter(status=cert_statuses.audit_notpassing) elif filter_type == "error": - return certificates.filter(status=CertificateStatuses.error) + return certificates.filter(status=cert_statuses.error) return certificates - def _get_allowlist_dict(self, course_key, CertificateAllowlist): + def _get_allowlist_dict(self, course_key, cert_allowlist): """Get allowlist entries as a dictionary keyed by user_id.""" allowlist_dict = {} - allowlist_entries = CertificateAllowlist.objects.filter( + allowlist_entries = cert_allowlist.objects.filter( course_id=course_key, allowlist=True ).select_related('user') @@ -1203,10 +1203,10 @@ def _get_allowlist_dict(self, course_key, CertificateAllowlist): } return allowlist_dict - def _get_invalidation_dict(self, course_key, CertificateInvalidation): + def _get_invalidation_dict(self, course_key, cert_invalidation): """Get invalidation entries as a dictionary keyed by user_id.""" invalidation_dict = {} - invalidations = CertificateInvalidation.objects.filter( + invalidations = cert_invalidation.objects.filter( generated_certificate__course_id=course_key, active=True ).select_related('generated_certificate__user', 'invalidated_by') @@ -1277,7 +1277,10 @@ def get_queryset(self): ) # Debug logging - log.debug(f"Certificate query for course {course_key}: found {certificates.count()} certificates, filter_type: {filter_type}") + log.debug( + f"Certificate query for course {course_key}: " + f"found {certificates.count()} certificates, filter_type: {filter_type}" + ) # Apply filter based on filter type certificates = self._apply_certificate_status_filter(certificates, filter_type, CertificateStatuses)