diff --git a/openedx/core/djangoapps/notifications/email/tasks.py b/openedx/core/djangoapps/notifications/email/tasks.py index 2e7c802e5140..9be5aaede384 100644 --- a/openedx/core/djangoapps/notifications/email/tasks.py +++ b/openedx/core/djangoapps/notifications/email/tasks.py @@ -9,6 +9,7 @@ from django.conf import settings from django.contrib.auth import get_user_model from django.db import transaction +from django.utils import timezone as django_timezone from django.utils.translation import gettext as _, override as translation_override from edx_ace import ace from edx_ace.recipient import Recipient @@ -17,6 +18,7 @@ from openedx.core.djangoapps.notifications.email_notifications import EmailCadence from openedx.core.djangoapps.notifications.models import ( + DigestSchedule, Notification, NotificationPreference, ) @@ -114,27 +116,334 @@ def send_digest_email_to_user( message = add_headers_to_email_message(message, message_context) message.options['skip_disable_user_policy'] = True ace.send(message) - notifications.update(email_sent_on=datetime.now()) + notifications.update(email_sent_on=django_timezone.now()) send_user_email_digest_sent_event(user, cadence_type, notifications_list, message_context) logger.info(f' Email sent to {user.username} ==Temp Log==') -@shared_task(ignore_result=True) +def get_next_digest_delivery_time(cadence_type): + """ + Calculate the next delivery time for a digest email based on cadence type. + + Uses Django settings for configurable delivery time/day: + - NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR (default: 17) + - NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE (default: 0) + - NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY (default: 0 = Monday) + - NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR (default: 17) + - NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE (default: 0) + + Returns: + datetime: The next scheduled delivery time in UTC. + """ + now = django_timezone.now() + + if cadence_type == EmailCadence.DAILY: + delivery_hour = max(0, min(23, getattr(settings, 'NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR', 17))) + delivery_minute = max(0, min(59, getattr(settings, 'NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE', 0))) + + # Calculate next delivery time + delivery_time = now.replace( + hour=delivery_hour, + minute=delivery_minute, + second=0, + microsecond=0 + ) + # If the delivery time has already passed today, schedule for tomorrow + if delivery_time <= now: + delivery_time += timedelta(days=1) + + return delivery_time + + elif cadence_type == EmailCadence.WEEKLY: + delivery_day = max(0, min(6, getattr(settings, 'NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY', 0))) # 0=Monday + delivery_hour = max(0, min(23, getattr(settings, 'NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR', 17))) + delivery_minute = max(0, min(59, getattr(settings, 'NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE', 0))) + + # Calculate next delivery day + days_ahead = delivery_day - now.weekday() + if days_ahead < 0: + days_ahead += 7 + + delivery_time = now.replace( + hour=delivery_hour, + minute=delivery_minute, + second=0, + microsecond=0 + ) + timedelta(days=days_ahead) + + # If the delivery time is today but has already passed, schedule for next week + if delivery_time <= now: + delivery_time += timedelta(days=7) + + return delivery_time + + raise ValueError(f"Invalid cadence_type for digest scheduling: {cadence_type}") + + +def get_digest_dedupe_key(user_id, cadence_type, delivery_time): + """ + Generate a deduplication key for a digest email task. + + This key ensures that only one digest task is scheduled per user per cadence period. + + Returns: + str: A unique key based on user_id, cadence_type, and delivery window. + """ + window_key = delivery_time.strftime('%Y-%m-%d-%H') + return f"digest:{user_id}:{cadence_type}:{window_key}" + + +def is_digest_already_scheduled(user_id, cadence_type, delivery_time): + """ + Check if a digest email is already scheduled for this user in the current cadence window. + + This prevents duplicate scheduling when multiple notifications arrive + in the same digest window. + + Uses DigestSchedule model for an exact (user, cadence_type, delivery_time) lookup — + one record represents one pending Celery task. This is intentionally separate from + Notification.email_scheduled, which tracks the immediate/buffer cadence flow and + operates at the notification row level rather than the task level. + """ + if cadence_type not in [EmailCadence.DAILY, EmailCadence.WEEKLY]: + return False + + return DigestSchedule.objects.filter( + user_id=user_id, + cadence_type=cadence_type, + delivery_time=delivery_time, + ).exists() + + +def is_digest_already_sent_in_window(user_id, cadence_type, delivery_time): + """ + Check if a digest email has already been sent for this user in the current cadence window. + + This prevents duplicate emails when both cron jobs and delayed tasks co-exist. + """ + if cadence_type == EmailCadence.DAILY: + window_start = delivery_time - timedelta(days=1) + elif cadence_type == EmailCadence.WEEKLY: + window_start = delivery_time - timedelta(days=7) + else: + return False + + return Notification.objects.filter( + user_id=user_id, + email=True, + email_sent_on__gte=window_start, + email_sent_on__lte=delivery_time, + ).exists() + + +def schedule_user_digest_email(user_id, cadence_type): + """ + Schedule a delayed Celery task to send a digest email to a user. + + This is called when a notification is created for a user who has + Daily or Weekly email cadence. It: + 1. Calculates the next delivery time based on settings + 2. Checks if a digest task is already scheduled for this window + 3. Marks the notification as scheduled + 4. Schedules a delayed Celery task with apply_async(eta=...) + + The check-then-act logic is wrapped in a transaction to prevent + race conditions when multiple notifications arrive concurrently. + + Args: + user_id: ID of the user to send digest to + cadence_type: EmailCadence.DAILY or EmailCadence.WEEKLY + """ + if not is_email_notification_flag_enabled(): + return + + if cadence_type not in [EmailCadence.DAILY, EmailCadence.WEEKLY]: + logger.warning(f' Invalid cadence_type {cadence_type} for user {user_id}') + return + + delivery_time = get_next_digest_delivery_time(cadence_type) + + + with transaction.atomic(): + + task_id = get_digest_dedupe_key(user_id, cadence_type, delivery_time) + _schedule, created = DigestSchedule.objects.get_or_create( + user_id=user_id, + cadence_type=cadence_type, + delivery_time=delivery_time, + defaults={'task_id': task_id}, + ) + + if not created: + # Another worker already scheduled this window. + logger.info( + f' Digest already scheduled for user {user_id}, ' + f'cadence={cadence_type}, delivery_time={delivery_time}' + ) + return + + if is_digest_already_sent_in_window(user_id, cadence_type, delivery_time): + logger.info( + f' Digest already sent for user {user_id} in this window, ' + f'cadence={cadence_type}, delivery_time={delivery_time}' + ) + # Remove the record we just created — no task needed. + _schedule.delete() + return + + # Mark unscheduled notifications for this user as scheduled. + + if cadence_type == EmailCadence.DAILY: + window_start = delivery_time - timedelta(days=1) + else: + window_start = delivery_time - timedelta(days=7) + + updated = Notification.objects.filter( + user_id=user_id, + email=True, + email_scheduled=False, + email_sent_on__isnull=True, + created__gte=window_start, + ).update(email_scheduled=True) + + if updated == 0: + logger.info( + f' No unsent notifications to schedule for user {user_id}' + ) + # Remove the record — nothing to deliver. + _schedule.delete() + return + + # Schedule the delayed celery task + send_user_digest_email_task.apply_async( + kwargs={ + 'user_id': user_id, + 'cadence_type': cadence_type, + }, + eta=delivery_time, + task_id=task_id, + ) + + logger.info( + f' Scheduled {cadence_type} digest for user {user_id} ' + f'at {delivery_time} (task_id={task_id})' + ) + + +@shared_task(bind=True, ignore_result=True, max_retries=3, default_retry_delay=300) @set_code_owner_attribute -def send_digest_email_to_all_users(cadence_type): +def send_user_digest_email_task(self, user_id, cadence_type): """ - Send email digest to all eligible users + Delayed Celery task to send a digest email to a single user. + + This task is scheduled with apply_async(eta=...) for the configured + delivery time. When it fires: + 1. Checks if email was already sent (by cron job) to avoid duplicates + 2. Gathers all unsent notifications for the cadence window + 3. Sends the digest email + 4. Marks notifications as sent """ - logger.info(f' Sending cadence email of type {cadence_type}') - users = get_audience_for_cadence_email(cadence_type) - language_prefs = get_language_preference_for_users([user.id for user in users]) - courses_data = {} - start_date, end_date = get_start_end_date(cadence_type) - logger.info(f' Email Cadence Audience {len(users)}') - for user in users: - user_language = language_prefs.get(user.id, 'en') - send_digest_email_to_user(user, cadence_type, start_date, end_date, user_language=user_language, - courses_data=courses_data) + try: + if not ENABLE_EMAIL_NOTIFICATIONS.is_enabled(): + logger.info(f' Email notifications disabled, skipping user {user_id}') + return + + user = User.objects.get(id=user_id) + + if not user.has_usable_password(): + logger.info(f' User {user.username} is disabled, skipping') + _cleanup_digest_schedule_for_current_window(user_id, cadence_type) + return + + if not is_email_notification_flag_enabled(user): + logger.info(f' Email flag disabled for user {user.username}') + _cleanup_digest_schedule_for_current_window(user_id, cadence_type) + return + + start_date, end_date = get_start_end_date(cadence_type) + + already_sent = Notification.objects.filter( + user_id=user_id, + email=True, + email_sent_on__gte=start_date, + email_sent_on__lte=end_date, + ).exists() + + if already_sent: + logger.info( + f' Digest already sent for user {user.username} ' + f'in window {start_date} to {end_date}. Clearing scheduled flags.' + ) + # Clear scheduled flags so they're not picked up again + Notification.objects.filter( + user_id=user_id, + email=True, + email_scheduled=True, + created__gte=start_date, + created__lte=end_date, + ).update(email_scheduled=False) + _cleanup_digest_schedule_for_current_window(user_id, cadence_type) + return + + language_prefs = get_language_preference_for_users([user_id]) + user_language = language_prefs.get(user_id, 'en') + courses_data = {} + + send_digest_email_to_user( + user, cadence_type, start_date, end_date, + user_language=user_language, + courses_data=courses_data + ) + + # Clear scheduled flags after successful send + Notification.objects.filter( + user_id=user_id, + email=True, + email_scheduled=True, + created__gte=start_date, + created__lte=end_date, + ).update(email_scheduled=False) + + # Remove only the current window's DigestSchedule record — future + # windows that may have been scheduled concurrently must be preserved. + _cleanup_digest_schedule_for_current_window(user_id, cadence_type) + + logger.info(f' Successfully sent {cadence_type} digest to user {user.username}') + + except User.DoesNotExist: + logger.error(f' User {user_id} not found') + # Clean up the orphaned DigestSchedule so future windows are not blocked. + _cleanup_digest_schedule_for_current_window(user_id, cadence_type) + + except Exception as exc: + logger.exception(f' Failed to send digest to user {user_id}: {exc}') + retry_countdown = 300 * (2 ** self.request.retries) + raise self.retry(exc=exc, countdown=retry_countdown) + + +def _cleanup_digest_schedule_for_current_window(user_id, cadence_type): + """ + Remove DigestSchedule records only for the current delivery window. + + This ensures that a future window's DigestSchedule (created when a new + notification arrives after the current task was scheduled) is preserved. + """ + now = django_timezone.now() + + if cadence_type == EmailCadence.DAILY: + # The current window's delivery_time is at most 1 day + buffer in the past + window_cutoff = now - timedelta(days=1, hours=1) + elif cadence_type == EmailCadence.WEEKLY: + window_cutoff = now - timedelta(days=7, hours=1) + else: + return + + DigestSchedule.objects.filter( + user_id=user_id, + cadence_type=cadence_type, + delivery_time__lte=now, + delivery_time__gte=window_cutoff, + ).delete() def send_immediate_cadence_email(email_notification_mapping, course_key): diff --git a/openedx/core/djangoapps/notifications/email/tests/test_tasks.py b/openedx/core/djangoapps/notifications/email/tests/test_tasks.py index 4fdcf1aaf04e..16703a864b95 100644 --- a/openedx/core/djangoapps/notifications/email/tests/test_tasks.py +++ b/openedx/core/djangoapps/notifications/email/tests/test_tasks.py @@ -2,7 +2,7 @@ Test cases for notifications/email/tasks.py """ import datetime -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone as dt_timezone from unittest.mock import Mock, patch import ddt @@ -15,19 +15,26 @@ from common.djangoapps.student.tests.factories import UserFactory from openedx.core.djangoapps.notifications.config.waffle import ENABLE_EMAIL_NOTIFICATIONS, ENABLE_NOTIFICATIONS from openedx.core.djangoapps.notifications.email.tasks import ( + _cleanup_digest_schedule_for_current_window, add_to_existing_buffer, decide_email_action, get_audience_for_cadence_email, + get_next_digest_delivery_time, + is_digest_already_scheduled, + is_digest_already_sent_in_window, schedule_digest_buffer, + schedule_user_digest_email, send_buffered_digest, send_digest_email_to_all_users, send_digest_email_to_user, send_immediate_cadence_email, - send_immediate_email + send_immediate_email, + send_user_digest_email_task, ) from openedx.core.djangoapps.notifications.email.utils import get_start_end_date from openedx.core.djangoapps.notifications.email_notifications import EmailCadence from openedx.core.djangoapps.notifications.models import ( + DigestSchedule, Notification, NotificationPreference ) @@ -433,10 +440,12 @@ def test_email_sent_when_cadence_is_immediate(self, mock_ace_send): assert mock_ace_send.call_count == 1 + @patch('openedx.core.djangoapps.notifications.email.tasks.send_user_digest_email_task.apply_async') @patch('edx_ace.ace.send') - def test_email_not_sent_when_cadence_is_not_immediate(self, mock_ace_send): + def test_email_not_sent_when_cadence_is_not_immediate(self, mock_ace_send, mock_apply_async): """ Tests that an email is NOT sent via send_notifications when cadence is DAILY. + The digest is scheduled for later delivery — ace.send must not be called immediately. """ # Modify preference for this test case self.preference.email = True @@ -1199,3 +1208,849 @@ def get_new_post_notification_content_context(**kwargs): "email_content": "

Email content

", **kwargs } + + +@ddt.ddt +class TestGetNextDigestDeliveryTime(ModuleStoreTestCase): + """Tests for get_next_digest_delivery_time function.""" + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) # Friday 10 AM UTC + @override_settings(NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=17, NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=0) + def test_daily_delivery_time_later_today(self): + """Test daily delivery is scheduled for later today if time hasn't passed.""" + delivery_time = get_next_digest_delivery_time(EmailCadence.DAILY) + assert delivery_time.hour == 17 + assert delivery_time.minute == 0 + assert delivery_time.day == 6 # Today + + @freeze_time("2026-03-06 18:00:00", tz_offset=0) # Friday 6 PM UTC + @override_settings(NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=17, NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=0) + def test_daily_delivery_time_tomorrow_if_passed(self): + """Test daily delivery is scheduled for tomorrow if today's time has passed.""" + delivery_time = get_next_digest_delivery_time(EmailCadence.DAILY) + assert delivery_time.hour == 17 + assert delivery_time.day == 7 # Tomorrow + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) # Friday + @override_settings( + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY=0, # Monday + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR=17, + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE=0 + ) + def test_weekly_delivery_time_next_monday(self): + """Test weekly delivery scheduled for next Monday.""" + delivery_time = get_next_digest_delivery_time(EmailCadence.WEEKLY) + assert delivery_time.weekday() == 0 # Monday + assert delivery_time.hour == 17 + assert delivery_time.day == 9 # Next Monday (March 9) + + @freeze_time("2026-03-09 10:00:00", tz_offset=0) # Monday 10 AM UTC + @override_settings( + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY=0, # Monday + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR=17, + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE=0 + ) + def test_weekly_delivery_time_today_if_not_passed(self): + """Test weekly delivery scheduled for today if it's the right day and time hasn't passed.""" + delivery_time = get_next_digest_delivery_time(EmailCadence.WEEKLY) + assert delivery_time.weekday() == 0 # Monday + assert delivery_time.day == 9 # Today + assert delivery_time.hour == 17 + + @freeze_time("2026-03-09 18:00:00", tz_offset=0) # Monday 6 PM UTC + @override_settings( + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY=0, # Monday + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR=17, + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE=0 + ) + def test_weekly_delivery_time_next_week_if_passed(self): + """Test weekly delivery scheduled for next week if today's time has passed.""" + delivery_time = get_next_digest_delivery_time(EmailCadence.WEEKLY) + assert delivery_time.weekday() == 0 # Monday + assert delivery_time.day == 16 # Next Monday + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings( + NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=9, + NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=30 + ) + def test_daily_custom_delivery_time(self): + """Test custom delivery hour and minute from settings.""" + delivery_time = get_next_digest_delivery_time(EmailCadence.DAILY) + # 9:30 has passed (it's 10:00), so should be tomorrow + assert delivery_time.day == 7 + assert delivery_time.hour == 9 + assert delivery_time.minute == 30 + + def test_invalid_cadence_raises_error(self): + """Test that invalid cadence type raises ValueError.""" + with self.assertRaises(ValueError): + get_next_digest_delivery_time(EmailCadence.IMMEDIATELY) + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings( + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY=4, # Friday + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR=17, + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE=0 + ) + def test_weekly_delivery_same_day_future_time(self): + """Test weekly delivery on same weekday but later time.""" + delivery_time = get_next_digest_delivery_time(EmailCadence.WEEKLY) + assert delivery_time.weekday() == 4 # Friday + assert delivery_time.day == 6 # Today (Friday) + assert delivery_time.hour == 17 + + +@ddt.ddt +class TestIsDigestAlreadyScheduled(ModuleStoreTestCase): + """Tests for is_digest_already_scheduled function.""" + + def setUp(self): + super().setUp() + self.user = UserFactory() + self.course = CourseFactory.create() + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + def test_no_scheduled_notifications(self): + """Test returns False when no DigestSchedule record exists.""" + delivery_time = datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc) + assert is_digest_already_scheduled(self.user.id, EmailCadence.DAILY, delivery_time) is False + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + def test_has_scheduled_notification(self): + """Test returns True when a DigestSchedule record exists for the exact delivery_time.""" + delivery_time = datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc) + DigestSchedule.objects.create( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=delivery_time, + task_id='test-task-id', + ) + assert is_digest_already_scheduled(self.user.id, EmailCadence.DAILY, delivery_time) is True + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + def test_scheduled_notification_outside_window(self): + """Test returns False when DigestSchedule record has a different delivery_time.""" + delivery_time = datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc) + different_delivery_time = datetime(2026, 3, 5, 17, 0, tzinfo=dt_timezone.utc) # Yesterday + DigestSchedule.objects.create( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=different_delivery_time, + task_id='test-task-id', + ) + assert is_digest_already_scheduled(self.user.id, EmailCadence.DAILY, delivery_time) is False + + +@ddt.ddt +class TestIsDigestAlreadySentInWindow(ModuleStoreTestCase): + """Tests for is_digest_already_sent_in_window function.""" + + def setUp(self): + super().setUp() + self.user = UserFactory() + self.course = CourseFactory.create() + + def test_no_sent_notifications(self): + """Test returns False when no digest has been sent.""" + delivery_time = datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc) + assert is_digest_already_sent_in_window(self.user.id, EmailCadence.DAILY, delivery_time) is False + + def test_has_sent_notification_in_window(self): + """Test returns True when digest was already sent in window.""" + delivery_time = datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc) + Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + email=True, + email_sent_on=datetime(2026, 3, 6, 10, 0, tzinfo=dt_timezone.utc), + ) + assert is_digest_already_sent_in_window(self.user.id, EmailCadence.DAILY, delivery_time) is True + + +@ddt.ddt +class TestScheduleUserDigestEmail(ModuleStoreTestCase): + """Tests for schedule_user_digest_email function.""" + + def setUp(self): + super().setUp() + self.user = UserFactory() + self.course = CourseFactory.create() + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings(NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=17, NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.send_user_digest_email_task.apply_async') + def test_schedules_daily_digest_task(self, mock_apply_async): + """Test that a daily digest task is scheduled when notification exists.""" + Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + email=True, + email_scheduled=False, + email_sent_on=None, + ) + + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + schedule_user_digest_email(self.user.id, EmailCadence.DAILY) + + assert mock_apply_async.called + call_kwargs = mock_apply_async.call_args[1] + assert call_kwargs['kwargs']['user_id'] == self.user.id + assert call_kwargs['kwargs']['cadence_type'] == EmailCadence.DAILY + # Should be scheduled for 5 PM UTC today + assert call_kwargs['eta'].hour == 17 + assert call_kwargs['eta'].day == 6 + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings(NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=17, NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.send_user_digest_email_task.apply_async') + def test_does_not_schedule_if_already_scheduled(self, mock_apply_async): + """Test that no duplicate task is scheduled when a DigestSchedule record already exists.""" + delivery_time = datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc) + DigestSchedule.objects.create( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=delivery_time, + task_id='existing-task-id', + ) + + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + schedule_user_digest_email(self.user.id, EmailCadence.DAILY) + + assert not mock_apply_async.called + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings(NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=17, NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.send_user_digest_email_task.apply_async') + def test_does_not_schedule_if_already_sent_in_window(self, mock_apply_async): + """Test that no task is scheduled if cron already sent digest.""" + Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + email=True, + email_scheduled=False, + email_sent_on=datetime(2026, 3, 6, 8, 0, tzinfo=dt_timezone.utc), # Sent earlier today + ) + + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + schedule_user_digest_email(self.user.id, EmailCadence.DAILY) + + assert not mock_apply_async.called + # Verify no stale DigestSchedule record was left behind + assert not DigestSchedule.objects.filter(user=self.user).exists() + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.send_user_digest_email_task.apply_async') + def test_invalid_cadence_does_not_schedule(self, mock_apply_async): + """Test that IMMEDIATELY cadence does not schedule a digest.""" + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + schedule_user_digest_email(self.user.id, EmailCadence.IMMEDIATELY) + assert not mock_apply_async.called + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings( + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY=0, + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR=17, + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE=0 + ) + @patch('openedx.core.djangoapps.notifications.email.tasks.send_user_digest_email_task.apply_async') + def test_schedules_weekly_digest_task(self, mock_apply_async): + """Test that a weekly digest task is scheduled correctly.""" + Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + email=True, + email_scheduled=False, + email_sent_on=None, + ) + + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + schedule_user_digest_email(self.user.id, EmailCadence.WEEKLY) + + assert mock_apply_async.called + call_kwargs = mock_apply_async.call_args[1] + assert call_kwargs['kwargs']['cadence_type'] == EmailCadence.WEEKLY + # Should be scheduled for next Monday 5 PM + assert call_kwargs['eta'].weekday() == 0 + assert call_kwargs['eta'].hour == 17 + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings(NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=17, NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.send_user_digest_email_task.apply_async') + def test_marks_notifications_as_scheduled(self, mock_apply_async): + """Test that notifications are marked as email_scheduled=True.""" + notif = Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + email=True, + email_scheduled=False, + email_sent_on=None, + ) + + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + schedule_user_digest_email(self.user.id, EmailCadence.DAILY) + + notif.refresh_from_db() + assert notif.email_scheduled is True + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings(NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=17, NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.send_user_digest_email_task.apply_async') + def test_creates_digest_schedule_record(self, mock_apply_async): + """Test that a DigestSchedule record is created after scheduling.""" + Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + email=True, + email_scheduled=False, + email_sent_on=None, + ) + + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + schedule_user_digest_email(self.user.id, EmailCadence.DAILY) + + delivery_time = datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc) + assert DigestSchedule.objects.filter( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=delivery_time, + ).exists() + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings(NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=17, NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.send_user_digest_email_task.apply_async') + def test_does_not_schedule_if_no_unsent_notifications(self, mock_apply_async): + """Test that no task is scheduled if there are no unsent notifications.""" + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + schedule_user_digest_email(self.user.id, EmailCadence.DAILY) + assert not mock_apply_async.called + # Verify no stale DigestSchedule record was left behind + assert not DigestSchedule.objects.filter(user=self.user).exists() + + +@ddt.ddt +class TestSendUserDigestEmailTask(ModuleStoreTestCase): + """Tests for the send_user_digest_email_task celery task.""" + + def setUp(self): + super().setUp() + self.user = UserFactory() + self.course = CourseFactory.create(display_name='Test Course') + + NotificationPreference.objects.filter(user=self.user).delete() + NotificationPreference.objects.create( + user=self.user, + app='discussion', + type='new_discussion_post', + email=True, + email_cadence=EmailCadence.DAILY, + ) + + @freeze_time("2026-03-06 17:00:00", tz_offset=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.ace.send') + def test_sends_digest_email(self, mock_ace_send): + """Test that digest email is sent successfully.""" + created_time = datetime(2026, 3, 6, 10, 0, tzinfo=dt_timezone.utc) + Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + content_context=get_new_post_notification_content_context(), + email=True, + email_scheduled=True, + created=created_time, + ) + + with override_waffle_flag(ENABLE_NOTIFICATIONS, True): + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + send_user_digest_email_task( # pylint: disable=no-value-for-parameter + user_id=self.user.id, + cadence_type=EmailCadence.DAILY, + ) + + assert mock_ace_send.called + + @freeze_time("2026-03-06 17:00:00", tz_offset=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.ace.send') + def test_skips_if_already_sent_by_cron(self, mock_ace_send): + """Test that digest is skipped if cron already sent it.""" + created_time = datetime(2026, 3, 6, 10, 0, tzinfo=dt_timezone.utc) + Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + content_context=get_new_post_notification_content_context(), + email=True, + email_scheduled=True, + email_sent_on=datetime(2026, 3, 6, 15, 0, tzinfo=dt_timezone.utc), # Already sent by cron + created=created_time, + ) + + with override_waffle_flag(ENABLE_NOTIFICATIONS, True): + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + send_user_digest_email_task( # pylint: disable=no-value-for-parameter + user_id=self.user.id, + cadence_type=EmailCadence.DAILY, + ) + + assert not mock_ace_send.called + + @freeze_time("2026-03-06 17:00:00", tz_offset=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.ace.send') + def test_clears_scheduled_flags_after_send(self, mock_ace_send): + """Test that email_scheduled flags are cleared after successful send.""" + created_time = datetime(2026, 3, 6, 10, 0, tzinfo=dt_timezone.utc) + notif = Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + content_context=get_new_post_notification_content_context(), + email=True, + email_scheduled=True, + created=created_time, + ) + DigestSchedule.objects.create( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc), + task_id='test-task-id', + ) + + with override_waffle_flag(ENABLE_NOTIFICATIONS, True): + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + send_user_digest_email_task( # pylint: disable=no-value-for-parameter + user_id=self.user.id, + cadence_type=EmailCadence.DAILY, + ) + + notif.refresh_from_db() + assert notif.email_scheduled is False + assert not DigestSchedule.objects.filter(user=self.user, cadence_type=EmailCadence.DAILY).exists() + + @freeze_time("2026-03-06 17:00:00", tz_offset=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.ace.send') + def test_skips_disabled_user(self, mock_ace_send): + """Test that digest is not sent to disabled user and DigestSchedule is cleaned up.""" + self.user.set_unusable_password() + self.user.save() + + created_time = datetime(2026, 3, 6, 10, 0, tzinfo=dt_timezone.utc) + Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + content_context=get_new_post_notification_content_context(), + email=True, + email_scheduled=True, + created=created_time, + ) + DigestSchedule.objects.create( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc), + task_id='test-task-id', + ) + + with override_waffle_flag(ENABLE_NOTIFICATIONS, True): + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + send_user_digest_email_task( # pylint: disable=no-value-for-parameter + user_id=self.user.id, + cadence_type=EmailCadence.DAILY, + ) + + assert not mock_ace_send.called + # Verify DigestSchedule was cleaned up even though user is disabled + assert not DigestSchedule.objects.filter( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc), + ).exists() + + @freeze_time("2026-03-06 17:00:00", tz_offset=0) + def test_handles_missing_user(self): + """Test that task handles non-existent user gracefully and cleans up DigestSchedule.""" + # Create a DigestSchedule record for the non-existent user + DigestSchedule.objects.create( + user_id=99999, + cadence_type=EmailCadence.DAILY, + delivery_time=datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc), + task_id='orphan-task-id', + ) + + with override_waffle_flag(ENABLE_NOTIFICATIONS, True): + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + # Should not raise + send_user_digest_email_task( # pylint: disable=no-value-for-parameter + user_id=99999, + cadence_type=EmailCadence.DAILY, + ) + + # Verify orphaned DigestSchedule was cleaned up + assert not DigestSchedule.objects.filter(user_id=99999).exists() + + @freeze_time("2026-03-06 17:00:00", tz_offset=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.ace.send') + def test_skips_when_email_flag_disabled(self, mock_ace_send): + """Test that task skips when email notification flag is disabled.""" + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, False): + send_user_digest_email_task( # pylint: disable=no-value-for-parameter + user_id=self.user.id, + cadence_type=EmailCadence.DAILY, + ) + assert not mock_ace_send.called + + @freeze_time("2026-03-06 17:00:00", tz_offset=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.ace.send') + def test_clears_scheduled_flags_even_when_cron_sent(self, mock_ace_send): + """Test that scheduled flags and DigestSchedule record are cleared even when cron already sent.""" + created_time = datetime(2026, 3, 6, 10, 0, tzinfo=dt_timezone.utc) + notif = Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + content_context=get_new_post_notification_content_context(), + email=True, + email_scheduled=True, + email_sent_on=datetime(2026, 3, 6, 15, 0, tzinfo=dt_timezone.utc), # Sent by cron + created=created_time, + ) + DigestSchedule.objects.create( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc), + task_id='test-task-id', + ) + + with override_waffle_flag(ENABLE_NOTIFICATIONS, True): + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + send_user_digest_email_task( # pylint: disable=no-value-for-parameter + user_id=self.user.id, + cadence_type=EmailCadence.DAILY, + ) + + notif.refresh_from_db() + assert notif.email_scheduled is False + assert not mock_ace_send.called + assert not DigestSchedule.objects.filter(user=self.user, cadence_type=EmailCadence.DAILY).exists() + + +@ddt.ddt +class TestDeprecatedSendDigestEmailToAllUsers(ModuleStoreTestCase): + """Tests for the deprecated send_digest_email_to_all_users task.""" + + def setUp(self): + super().setUp() + self.user = UserFactory() + self.course = CourseFactory.create(display_name='test course') + + # Ensure a NotificationPreference exists so the cadence-aware skip + # logic in the deprecated task can discover notification types + # belonging to the Daily cadence. + NotificationPreference.objects.filter(user=self.user).delete() + NotificationPreference.objects.create( + user=self.user, + app='discussion', + type='new_discussion_post', + email=True, + email_cadence=EmailCadence.DAILY, + ) + + @patch('openedx.core.djangoapps.notifications.email.tasks.send_digest_email_to_user') + def test_skips_users_with_already_sent_digest(self, mock_send_digest): + """Test that users who already received digest from delayed task are skipped.""" + created_time = datetime.now() - timedelta(hours=12) + notif = Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + email=True, + email_sent_on=datetime.now() - timedelta(hours=1), # Already sent by delayed task + created=created_time, + ) + + import warnings + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + send_digest_email_to_all_users(EmailCadence.DAILY) + + assert not mock_send_digest.called + + @patch('openedx.core.djangoapps.notifications.email.tasks.send_digest_email_to_user') + def test_sends_to_users_without_prior_digest(self, mock_send_digest): + """Test that users who haven't received digest are still processed.""" + created_time = datetime.now() - timedelta(hours=12) + Notification.objects.create( + user=self.user, + course_id=str(self.course.id), + app_name='discussion', + notification_type='new_discussion_post', + content_url='http://example.com', + email=True, + email_sent_on=None, # Not yet sent + created=created_time, + ) + + import warnings + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + send_digest_email_to_all_users(EmailCadence.DAILY) + + assert mock_send_digest.called + + +@ddt.ddt +class TestDigestSchedulingIntegration(ModuleStoreTestCase): + """Integration tests for the full digest scheduling flow.""" + + def setUp(self): + super().setUp() + self.user = UserFactory() + self.course = CourseFactory.create(display_name='Test Course') + + NotificationPreference.objects.filter(user=self.user).delete() + NotificationPreference.objects.create( + user=self.user, + app='discussion', + type='new_discussion_post', + email=True, + email_cadence=EmailCadence.DAILY, + ) + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings(NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=17, NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.send_user_digest_email_task.apply_async') + def test_notification_triggers_digest_scheduling(self, mock_apply_async): + """Test that creating a notification triggers digest scheduling via send_notifications.""" + context = { + 'username': 'User', + 'post_title': 'Test Post' + } + + with override_waffle_flag(ENABLE_NOTIFICATIONS, True): + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + send_notifications( + [self.user.id], + str(self.course.id), + 'discussion', + 'new_discussion_post', + context, + 'http://test.url' + ) + + # A digest task should have been scheduled + assert mock_apply_async.called + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings(NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=17, NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.send_user_digest_email_task.apply_async') + def test_multiple_notifications_schedule_only_once(self, mock_apply_async): + """Test that multiple notifications in same window only schedule one task.""" + context = { + 'username': 'User', + 'post_title': 'Test Post' + } + + with override_waffle_flag(ENABLE_NOTIFICATIONS, True): + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + send_notifications( + [self.user.id], + str(self.course.id), + 'discussion', + 'new_discussion_post', + context.copy(), + 'http://test.url' + ) + send_notifications( + [self.user.id], + str(self.course.id), + 'discussion', + 'new_discussion_post', + context.copy(), + 'http://test.url' + ) + + # Should be called only once because second time notifications are already scheduled + assert mock_apply_async.call_count == 1 + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings(NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=17, NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=0) + @patch('openedx.core.djangoapps.notifications.email.tasks.ace.send') + @patch('openedx.core.djangoapps.notifications.email.tasks.send_user_digest_email_task.apply_async') + def test_immediate_cadence_does_not_trigger_digest(self, mock_digest_async, mock_ace_send): + """Test that immediate cadence users don't get digest scheduled.""" + NotificationPreference.objects.filter(user=self.user).delete() + NotificationPreference.objects.create( + user=self.user, + app='discussion', + type='new_discussion_post', + email=True, + email_cadence=EmailCadence.IMMEDIATELY, + ) + + context = { + 'username': 'User', + 'post_title': 'Test Post' + } + + with override_waffle_flag(ENABLE_NOTIFICATIONS, True): + with override_waffle_flag(ENABLE_EMAIL_NOTIFICATIONS, True): + send_notifications( + [self.user.id], + str(self.course.id), + 'discussion', + 'new_discussion_post', + context, + 'http://test.url' + ) + + # Immediate email should be sent, NOT a digest scheduled + assert mock_ace_send.called + assert not mock_digest_async.called + + +@ddt.ddt +class TestGetNextDigestDeliveryTimeSettingsValidation(ModuleStoreTestCase): + """Tests for settings validation in get_next_digest_delivery_time.""" + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings(NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR=25, NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE=99) + def test_daily_invalid_settings_clamped(self): + """Test that invalid hour/minute values are clamped to valid ranges.""" + delivery_time = get_next_digest_delivery_time(EmailCadence.DAILY) + assert delivery_time.hour == 23 # clamped from 25 + assert delivery_time.minute == 59 # clamped from 99 + + @freeze_time("2026-03-06 10:00:00", tz_offset=0) + @override_settings( + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY=10, + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR=-1, + NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE=-5, + ) + def test_weekly_invalid_settings_clamped(self): + """Test that invalid day/hour/minute values are clamped to valid ranges.""" + delivery_time = get_next_digest_delivery_time(EmailCadence.WEEKLY) + assert delivery_time.weekday() == 6 # clamped from 10 → min(6, max(0, 10)) = 6 (Sunday) + assert delivery_time.hour == 0 # clamped from -1 + assert delivery_time.minute == 0 # clamped from -5 + + +@ddt.ddt +class TestCleanupDigestScheduleForCurrentWindow(ModuleStoreTestCase): + """Tests for _cleanup_digest_schedule_for_current_window.""" + + def setUp(self): + super().setUp() + self.user = UserFactory() + + @freeze_time("2026-03-06 17:00:00", tz_offset=0) + def test_cleans_up_current_window_record(self): + """Test that the current window's DigestSchedule record is deleted.""" + DigestSchedule.objects.create( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc), + task_id='current-task-id', + ) + + _cleanup_digest_schedule_for_current_window(self.user.id, EmailCadence.DAILY) + + assert not DigestSchedule.objects.filter( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc), + ).exists() + + @freeze_time("2026-03-06 17:00:00", tz_offset=0) + def test_preserves_future_window_record(self): + """Test that a future window's DigestSchedule record is NOT deleted.""" + # Current window record (should be deleted) + DigestSchedule.objects.create( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc), + task_id='current-task-id', + ) + # Future window record (should be preserved) + DigestSchedule.objects.create( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=datetime(2026, 3, 7, 17, 0, tzinfo=dt_timezone.utc), + task_id='future-task-id', + ) + + _cleanup_digest_schedule_for_current_window(self.user.id, EmailCadence.DAILY) + + # Current record deleted + assert not DigestSchedule.objects.filter( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=datetime(2026, 3, 6, 17, 0, tzinfo=dt_timezone.utc), + ).exists() + # Future record preserved + assert DigestSchedule.objects.filter( + user=self.user, + cadence_type=EmailCadence.DAILY, + delivery_time=datetime(2026, 3, 7, 17, 0, tzinfo=dt_timezone.utc), + ).exists() + + @freeze_time("2026-03-09 17:00:00", tz_offset=0) + def test_weekly_preserves_future_record(self): + """Test that weekly cleanup preserves next week's record.""" + # Current week's record (should be deleted) + DigestSchedule.objects.create( + user=self.user, + cadence_type=EmailCadence.WEEKLY, + delivery_time=datetime(2026, 3, 9, 17, 0, tzinfo=dt_timezone.utc), + task_id='current-task-id', + ) + # Next week's record (should be preserved) + DigestSchedule.objects.create( + user=self.user, + cadence_type=EmailCadence.WEEKLY, + delivery_time=datetime(2026, 3, 16, 17, 0, tzinfo=dt_timezone.utc), + task_id='next-week-task-id', + ) + + _cleanup_digest_schedule_for_current_window(self.user.id, EmailCadence.WEEKLY) + + assert not DigestSchedule.objects.filter( + user=self.user, + delivery_time=datetime(2026, 3, 9, 17, 0, tzinfo=dt_timezone.utc), + ).exists() + assert DigestSchedule.objects.filter( + user=self.user, + delivery_time=datetime(2026, 3, 16, 17, 0, tzinfo=dt_timezone.utc), + ).exists() + diff --git a/openedx/core/djangoapps/notifications/email/utils.py b/openedx/core/djangoapps/notifications/email/utils.py index c3a8f62a29c9..2d168c642c57 100644 --- a/openedx/core/djangoapps/notifications/email/utils.py +++ b/openedx/core/djangoapps/notifications/email/utils.py @@ -201,11 +201,12 @@ def get_start_end_date(cadence_type): """ if cadence_type not in [EmailCadence.DAILY, EmailCadence.WEEKLY]: raise ValueError('Invalid cadence_type') - end_date = datetime.datetime.now() + from django.utils import timezone as django_timezone + end_date = django_timezone.now() start_date = end_date - datetime.timedelta(days=1, minutes=15) if cadence_type == EmailCadence.WEEKLY: start_date = start_date - datetime.timedelta(days=6) - return utc.localize(start_date), utc.localize(end_date) + return start_date, end_date def get_course_info(course_key): diff --git a/openedx/core/djangoapps/notifications/management/commands/send_email_digest.py b/openedx/core/djangoapps/notifications/management/commands/send_email_digest.py index 79857c18cf8a..5c310e3937e3 100644 --- a/openedx/core/djangoapps/notifications/management/commands/send_email_digest.py +++ b/openedx/core/djangoapps/notifications/management/commands/send_email_digest.py @@ -1,10 +1,15 @@ """ -Management command for sending email digest +Management command for sending email digest. + +DEPRECATED: This command is retained for backward compatibility. +Digest emails are now scheduled automatically via delayed Celery tasks +when notifications are created. Remove any cron jobs calling this command. """ +import warnings + from django.core.management.base import BaseCommand from openedx.core.djangoapps.notifications.email_notifications import EmailCadence -from openedx.core.djangoapps.notifications.email.tasks import send_digest_email_to_all_users class Command(BaseCommand): @@ -13,9 +18,15 @@ class Command(BaseCommand): python manage.py lms send_email_digest [cadence_type] cadence_type: Daily or Weekly + + DEPRECATED: Digest emails are now automatically scheduled via delayed + Celery tasks when notifications are created. This command is kept for + backward compatibility but will be removed in a future release. """ help = ( - "Send email digest to user." + "DEPRECATED: Send email digest to users. " + "Digest emails are now scheduled automatically. " + "Remove cron jobs using this command." ) def add_arguments(self, parser): @@ -29,4 +40,17 @@ def handle(self, *args, **kwargs): """ Start task to send email digest to users """ - send_digest_email_to_all_users.delay(kwargs['cadence_type']) + warnings.warn( + "The send_email_digest management command is deprecated. " + "Digest emails are now scheduled automatically via delayed Celery tasks " + "when notifications are created. Remove any cron jobs calling this command.", + DeprecationWarning, + stacklevel=2 + ) + self.stderr.write( + self.style.WARNING( + "WARNING: This command is deprecated. Digest emails are now scheduled " + "automatically. Please remove cron jobs using this command." + ) + ) + return diff --git a/openedx/core/djangoapps/notifications/migrations/0012_digestschedule.py b/openedx/core/djangoapps/notifications/migrations/0012_digestschedule.py new file mode 100644 index 000000000000..70b935700203 --- /dev/null +++ b/openedx/core/djangoapps/notifications/migrations/0012_digestschedule.py @@ -0,0 +1,33 @@ +# Generated by Django 5.2.12 on 2026-03-16 08:19 + +import django.db.models.deletion +import django.utils.timezone +import model_utils.fields +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('notifications', '0011_notification_email_scheduled_and_more'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name='DigestSchedule', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created', model_utils.fields.AutoCreatedField(default=django.utils.timezone.now, editable=False, verbose_name='created')), + ('modified', model_utils.fields.AutoLastModifiedField(default=django.utils.timezone.now, editable=False, verbose_name='modified')), + ('cadence_type', models.CharField(max_length=20)), + ('delivery_time', models.DateTimeField()), + ('task_id', models.CharField(max_length=255)), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='digest_schedules', to=settings.AUTH_USER_MODEL)), + ], + options={ + 'unique_together': {('user', 'cadence_type', 'delivery_time')}, + }, + ), + ] diff --git a/openedx/core/djangoapps/notifications/models.py b/openedx/core/djangoapps/notifications/models.py index 5cd76a01d24c..00af3f36ad50 100644 --- a/openedx/core/djangoapps/notifications/models.py +++ b/openedx/core/djangoapps/notifications/models.py @@ -176,6 +176,34 @@ def get_email_cadence_for_notification_type(self, *args, **kwargs) -> str: return self.email_cadence +class DigestSchedule(TimeStampedModel): + """ + Tracks scheduled Celery digest tasks for daily/weekly email digests. + + One record exists per (user, cadence_type, delivery_time) combination, + representing a single pending Celery task. This is the source of truth for + deduplication of digest scheduling. + + NOTE: This is intentionally separate from Notification.email_scheduled. + Notification.email_scheduled serves the immediate/buffer cadence flow + (decide_email_action → schedule_digest_buffer → send_buffered_digest) and + operates at the notification row level. DigestSchedule operates at the + task level — one record per scheduled Celery job — for daily/weekly digests only. + + .. no_pii: + """ + user = models.ForeignKey(User, related_name="digest_schedules", on_delete=models.CASCADE) + cadence_type = models.CharField(max_length=20) + delivery_time = models.DateTimeField() + task_id = models.CharField(max_length=255) + + class Meta: + unique_together = ('user', 'cadence_type', 'delivery_time') + + def __str__(self): + return f'{self.user.username} - {self.cadence_type} - {self.delivery_time} (task={self.task_id})' + + def create_notification_preference(user_id: int, notification_type: str) -> NotificationPreference: """ Create a single notification preference with appropriate defaults. diff --git a/openedx/core/djangoapps/notifications/tasks.py b/openedx/core/djangoapps/notifications/tasks.py index f44e4ce19729..df3b6d2da4a1 100644 --- a/openedx/core/djangoapps/notifications/tasks.py +++ b/openedx/core/djangoapps/notifications/tasks.py @@ -20,6 +20,8 @@ ) from openedx.core.djangoapps.notifications.email.tasks import send_immediate_cadence_email +from openedx.core.djangoapps.notifications.email.tasks import schedule_user_digest_email +from openedx.core.djangoapps.notifications.email.utils import is_email_notification_flag_enabled from openedx.core.djangoapps.notifications.config.waffle import ( ENABLE_NOTIFICATIONS, ENABLE_PUSH_NOTIFICATIONS @@ -123,6 +125,7 @@ def send_notifications(user_ids, course_key: str, app_name, notification_type, c default_web_config = get_default_values_of_preference(app_name, notification_type).get('web', False) generated_notification_audience = [] email_notification_mapping = {} + digest_schedule_users = {} # {user_id: cadence_type} for daily/weekly digest scheduling push_notification_audience = [] is_push_notification_enabled = ENABLE_PUSH_NOTIFICATIONS.is_enabled(course_key) task_id = str(uuid.uuid4()) @@ -182,6 +185,9 @@ def send_notifications(user_ids, course_key: str, app_name, notification_type, c if email_enabled and (email_cadence == EmailCadence.IMMEDIATELY): email_notification_user_ids.append(user_id) + if email_enabled and email_cadence in (EmailCadence.DAILY, EmailCadence.WEEKLY): + digest_schedule_users[user_id] = email_cadence + if push_notification: push_notification_audience.append(user_id) @@ -216,6 +222,20 @@ def send_notifications(user_ids, course_key: str, app_name, notification_type, c ) send_immediate_cadence_email(email_notification_mapping, course_key) + # Schedule delayed digest emails for users with Daily/Weekly cadence + if digest_schedule_users and is_email_notification_flag_enabled(): + logger.info( + f"Scheduling digest emails for {len(digest_schedule_users)} users " + f"for notification {notification_type}", + ) + for uid, cadence in digest_schedule_users.items(): + try: + schedule_user_digest_email(uid, cadence) + except Exception: # pylint: disable=broad-except + logger.exception( + f"Failed to schedule {cadence} digest email for user {uid}" + ) + if generated_notification: notification_generated_event( generated_notification_audience, app_name, notification_type, course_key, content_url, diff --git a/openedx/envs/common.py b/openedx/envs/common.py index 5d7c105025ed..15f101c0cb91 100644 --- a/openedx/envs/common.py +++ b/openedx/envs/common.py @@ -2663,6 +2663,14 @@ def add_optional_apps(optional_apps, installed_apps): NOTIFICATION_TYPE_ICONS = {} DEFAULT_NOTIFICATION_ICON_URL = "" +# Digest email delivery schedule settings (all times in UTC). +# Instance operators can override these to control when daily/weekly digest emails are sent. +NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR = 17 # Hour of day (0-23) to send daily digest (default: 5 PM UTC) +NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE = 0 # Minute of hour (0-59) to send daily digest +NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY = 0 # Day of week (0=Monday, 6=Sunday) to send weekly digest +NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR = 17 # Hour of day (0-23) to send weekly digest (default: 5 PM UTC) +NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE = 0 # Minute of hour (0-59) to send weekly digest + # These settings are used to override the default notification preferences values for apps and types. # Here is complete documentation about how to use them: openedx/core/djangoapps/notifications/docs/settings.md NOTIFICATION_APPS_OVERRIDE = {}