Skip to content

Commit f9fb0e1

Browse files
feat: use celery instead of cron jobs for notification digest
1 parent 0c5e96d commit f9fb0e1

8 files changed

Lines changed: 1304 additions & 23 deletions

File tree

openedx/core/djangoapps/notifications/email/tasks.py

Lines changed: 326 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""
22
Celery tasks for sending email notifications
33
"""
4+
import hashlib
45
from datetime import datetime, timedelta
56

67
from bs4 import BeautifulSoup
@@ -9,6 +10,7 @@
910
from django.conf import settings
1011
from django.contrib.auth import get_user_model
1112
from django.db import transaction
13+
from django.utils import timezone as django_timezone
1214
from django.utils.translation import gettext as _, override as translation_override
1315
from edx_ace import ace
1416
from edx_ace.recipient import Recipient
@@ -17,6 +19,7 @@
1719

1820
from openedx.core.djangoapps.notifications.email_notifications import EmailCadence
1921
from openedx.core.djangoapps.notifications.models import (
22+
DigestSchedule,
2023
Notification,
2124
NotificationPreference,
2225
)
@@ -114,27 +117,336 @@ def send_digest_email_to_user(
114117
message = add_headers_to_email_message(message, message_context)
115118
message.options['skip_disable_user_policy'] = True
116119
ace.send(message)
117-
notifications.update(email_sent_on=datetime.now())
120+
notifications.update(email_sent_on=django_timezone.now())
118121
send_user_email_digest_sent_event(user, cadence_type, notifications_list, message_context)
119122
logger.info(f'<Email Cadence> Email sent to {user.username} ==Temp Log==')
120123

121124

122-
@shared_task(ignore_result=True)
125+
def get_next_digest_delivery_time(cadence_type):
126+
"""
127+
Calculate the next delivery time for a digest email based on cadence type.
128+
129+
Uses Django settings for configurable delivery time/day:
130+
- NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR (default: 17)
131+
- NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE (default: 0)
132+
- NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY (default: 0 = Monday)
133+
- NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR (default: 17)
134+
- NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE (default: 0)
135+
136+
Returns:
137+
datetime: The next scheduled delivery time in UTC.
138+
"""
139+
now = django_timezone.now()
140+
141+
if cadence_type == EmailCadence.DAILY:
142+
delivery_hour = max(0, min(23, getattr(settings, 'NOTIFICATION_DAILY_DIGEST_DELIVERY_HOUR', 17)))
143+
delivery_minute = max(0, min(59, getattr(settings, 'NOTIFICATION_DAILY_DIGEST_DELIVERY_MINUTE', 0)))
144+
145+
# Calculate next delivery time
146+
delivery_time = now.replace(
147+
hour=delivery_hour,
148+
minute=delivery_minute,
149+
second=0,
150+
microsecond=0
151+
)
152+
# If the delivery time has already passed today, schedule for tomorrow
153+
if delivery_time <= now:
154+
delivery_time += timedelta(days=1)
155+
156+
return delivery_time
157+
158+
elif cadence_type == EmailCadence.WEEKLY:
159+
delivery_day = max(0, min(6, getattr(settings, 'NOTIFICATION_WEEKLY_DIGEST_DELIVERY_DAY', 0))) # 0=Monday
160+
delivery_hour = max(0, min(23, getattr(settings, 'NOTIFICATION_WEEKLY_DIGEST_DELIVERY_HOUR', 17)))
161+
delivery_minute = max(0, min(59, getattr(settings, 'NOTIFICATION_WEEKLY_DIGEST_DELIVERY_MINUTE', 0)))
162+
163+
# Calculate next delivery day
164+
days_ahead = delivery_day - now.weekday()
165+
if days_ahead < 0:
166+
days_ahead += 7
167+
168+
delivery_time = now.replace(
169+
hour=delivery_hour,
170+
minute=delivery_minute,
171+
second=0,
172+
microsecond=0
173+
) + timedelta(days=days_ahead)
174+
175+
# If the delivery time is today but has already passed, schedule for next week
176+
if delivery_time <= now:
177+
delivery_time += timedelta(days=7)
178+
179+
return delivery_time
180+
181+
raise ValueError(f"Invalid cadence_type for digest scheduling: {cadence_type}")
182+
183+
184+
def get_digest_dedupe_key(user_id, cadence_type, delivery_time):
185+
"""
186+
Generate a deduplication key for a digest email task.
187+
188+
This key ensures that only one digest task is scheduled per user per cadence period.
189+
190+
Returns:
191+
str: A unique key based on user_id, cadence_type, and delivery window.
192+
"""
193+
# Use the delivery date as the window key
194+
window_key = delivery_time.strftime('%Y-%m-%d-%H')
195+
raw_key = f"digest:{user_id}:{cadence_type}:{window_key}"
196+
return hashlib.md5(raw_key.encode()).hexdigest()
197+
198+
199+
def is_digest_already_scheduled(user_id, cadence_type, delivery_time):
200+
"""
201+
Check if a digest email is already scheduled for this user in the current cadence window.
202+
203+
This prevents duplicate scheduling when multiple notifications arrive
204+
in the same digest window.
205+
206+
Uses DigestSchedule model for an exact (user, cadence_type, delivery_time) lookup —
207+
one record represents one pending Celery task. This is intentionally separate from
208+
Notification.email_scheduled, which tracks the immediate/buffer cadence flow and
209+
operates at the notification row level rather than the task level.
210+
"""
211+
if cadence_type not in [EmailCadence.DAILY, EmailCadence.WEEKLY]:
212+
return False
213+
214+
return DigestSchedule.objects.filter(
215+
user_id=user_id,
216+
cadence_type=cadence_type,
217+
delivery_time=delivery_time,
218+
).exists()
219+
220+
221+
def is_digest_already_sent_in_window(user_id, cadence_type, delivery_time):
222+
"""
223+
Check if a digest email has already been sent for this user in the current cadence window.
224+
225+
This prevents duplicate emails when both cron jobs and delayed tasks co-exist.
226+
"""
227+
if cadence_type == EmailCadence.DAILY:
228+
window_start = delivery_time - timedelta(days=1)
229+
elif cadence_type == EmailCadence.WEEKLY:
230+
window_start = delivery_time - timedelta(days=7)
231+
else:
232+
return False
233+
234+
return Notification.objects.filter(
235+
user_id=user_id,
236+
email=True,
237+
email_sent_on__gte=window_start,
238+
email_sent_on__lte=delivery_time,
239+
).exists()
240+
241+
242+
def schedule_user_digest_email(user_id, cadence_type):
243+
"""
244+
Schedule a delayed Celery task to send a digest email to a user.
245+
246+
This is called when a notification is created for a user who has
247+
Daily or Weekly email cadence. It:
248+
1. Calculates the next delivery time based on settings
249+
2. Checks if a digest task is already scheduled for this window
250+
3. Marks the notification as scheduled
251+
4. Schedules a delayed Celery task with apply_async(eta=...)
252+
253+
The check-then-act logic is wrapped in a transaction to prevent
254+
race conditions when multiple notifications arrive concurrently.
255+
256+
Args:
257+
user_id: ID of the user to send digest to
258+
cadence_type: EmailCadence.DAILY or EmailCadence.WEEKLY
259+
"""
260+
if not is_email_notification_flag_enabled():
261+
return
262+
263+
if cadence_type not in [EmailCadence.DAILY, EmailCadence.WEEKLY]:
264+
logger.warning(f'<Digest Schedule> Invalid cadence_type {cadence_type} for user {user_id}')
265+
return
266+
267+
delivery_time = get_next_digest_delivery_time(cadence_type)
268+
269+
270+
with transaction.atomic():
271+
272+
task_id = get_digest_dedupe_key(user_id, cadence_type, delivery_time)
273+
_schedule, created = DigestSchedule.objects.get_or_create(
274+
user_id=user_id,
275+
cadence_type=cadence_type,
276+
delivery_time=delivery_time,
277+
defaults={'task_id': task_id},
278+
)
279+
280+
if not created:
281+
# Another worker already scheduled this window.
282+
logger.info(
283+
f'<Digest Schedule> Digest already scheduled for user {user_id}, '
284+
f'cadence={cadence_type}, delivery_time={delivery_time}'
285+
)
286+
return
287+
288+
if is_digest_already_sent_in_window(user_id, cadence_type, delivery_time):
289+
logger.info(
290+
f'<Digest Schedule> Digest already sent for user {user_id} in this window, '
291+
f'cadence={cadence_type}, delivery_time={delivery_time}'
292+
)
293+
# Remove the record we just created — no task needed.
294+
_schedule.delete()
295+
return
296+
297+
# Mark unscheduled notifications for this user as scheduled.
298+
299+
if cadence_type == EmailCadence.DAILY:
300+
window_start = delivery_time - timedelta(days=1)
301+
else:
302+
window_start = delivery_time - timedelta(days=7)
303+
304+
updated = Notification.objects.filter(
305+
user_id=user_id,
306+
email=True,
307+
email_scheduled=False,
308+
email_sent_on__isnull=True,
309+
created__gte=window_start,
310+
).update(email_scheduled=True)
311+
312+
if updated == 0:
313+
logger.info(
314+
f'<Digest Schedule> No unsent notifications to schedule for user {user_id}'
315+
)
316+
# Remove the record — nothing to deliver.
317+
_schedule.delete()
318+
return
319+
320+
# Schedule the delayed celery task
321+
send_user_digest_email_task.apply_async(
322+
kwargs={
323+
'user_id': user_id,
324+
'cadence_type': cadence_type,
325+
},
326+
eta=delivery_time,
327+
task_id=task_id,
328+
)
329+
330+
logger.info(
331+
f'<Digest Schedule> Scheduled {cadence_type} digest for user {user_id} '
332+
f'at {delivery_time} (task_id={task_id})'
333+
)
334+
335+
336+
@shared_task(bind=True, ignore_result=True, max_retries=3, default_retry_delay=300)
123337
@set_code_owner_attribute
124-
def send_digest_email_to_all_users(cadence_type):
338+
def send_user_digest_email_task(self, user_id, cadence_type):
125339
"""
126-
Send email digest to all eligible users
340+
Delayed Celery task to send a digest email to a single user.
341+
342+
This task is scheduled with apply_async(eta=...) for the configured
343+
delivery time. When it fires:
344+
1. Checks if email was already sent (by cron job) to avoid duplicates
345+
2. Gathers all unsent notifications for the cadence window
346+
3. Sends the digest email
347+
4. Marks notifications as sent
127348
"""
128-
logger.info(f'<Email Cadence> Sending cadence email of type {cadence_type}')
129-
users = get_audience_for_cadence_email(cadence_type)
130-
language_prefs = get_language_preference_for_users([user.id for user in users])
131-
courses_data = {}
132-
start_date, end_date = get_start_end_date(cadence_type)
133-
logger.info(f'<Email Cadence> Email Cadence Audience {len(users)}')
134-
for user in users:
135-
user_language = language_prefs.get(user.id, 'en')
136-
send_digest_email_to_user(user, cadence_type, start_date, end_date, user_language=user_language,
137-
courses_data=courses_data)
349+
try:
350+
if not ENABLE_EMAIL_NOTIFICATIONS.is_enabled():
351+
logger.info(f'<Digest Task> Email notifications disabled, skipping user {user_id}')
352+
return
353+
354+
user = User.objects.get(id=user_id)
355+
356+
if not user.has_usable_password():
357+
logger.info(f'<Digest Task> User {user.username} is disabled, skipping')
358+
_cleanup_digest_schedule_for_current_window(user_id, cadence_type)
359+
return
360+
361+
if not is_email_notification_flag_enabled(user):
362+
logger.info(f'<Digest Task> Email flag disabled for user {user.username}')
363+
_cleanup_digest_schedule_for_current_window(user_id, cadence_type)
364+
return
365+
366+
start_date, end_date = get_start_end_date(cadence_type)
367+
368+
already_sent = Notification.objects.filter(
369+
user_id=user_id,
370+
email=True,
371+
email_sent_on__gte=start_date,
372+
email_sent_on__lte=end_date,
373+
).exists()
374+
375+
if already_sent:
376+
logger.info(
377+
f'<Digest Task> Digest already sent for user {user.username} '
378+
f'in window {start_date} to {end_date}. Clearing scheduled flags.'
379+
)
380+
# Clear scheduled flags so they're not picked up again
381+
Notification.objects.filter(
382+
user_id=user_id,
383+
email=True,
384+
email_scheduled=True,
385+
created__gte=start_date,
386+
created__lte=end_date,
387+
).update(email_scheduled=False)
388+
_cleanup_digest_schedule_for_current_window(user_id, cadence_type)
389+
return
390+
391+
language_prefs = get_language_preference_for_users([user_id])
392+
user_language = language_prefs.get(user_id, 'en')
393+
courses_data = {}
394+
395+
send_digest_email_to_user(
396+
user, cadence_type, start_date, end_date,
397+
user_language=user_language,
398+
courses_data=courses_data
399+
)
400+
401+
# Clear scheduled flags after successful send
402+
Notification.objects.filter(
403+
user_id=user_id,
404+
email=True,
405+
email_scheduled=True,
406+
created__gte=start_date,
407+
created__lte=end_date,
408+
).update(email_scheduled=False)
409+
410+
# Remove only the current window's DigestSchedule record — future
411+
# windows that may have been scheduled concurrently must be preserved.
412+
_cleanup_digest_schedule_for_current_window(user_id, cadence_type)
413+
414+
logger.info(f'<Digest Task> Successfully sent {cadence_type} digest to user {user.username}')
415+
416+
except User.DoesNotExist:
417+
logger.error(f'<Digest Task> User {user_id} not found')
418+
# Clean up the orphaned DigestSchedule so future windows are not blocked.
419+
_cleanup_digest_schedule_for_current_window(user_id, cadence_type)
420+
421+
except Exception as exc:
422+
logger.exception(f'<Digest Task> Failed to send digest to user {user_id}: {exc}')
423+
retry_countdown = 300 * (2 ** self.request.retries)
424+
raise self.retry(exc=exc, countdown=retry_countdown)
425+
426+
427+
def _cleanup_digest_schedule_for_current_window(user_id, cadence_type):
428+
"""
429+
Remove DigestSchedule records only for the current delivery window.
430+
431+
This ensures that a future window's DigestSchedule (created when a new
432+
notification arrives after the current task was scheduled) is preserved.
433+
"""
434+
now = django_timezone.now()
435+
436+
if cadence_type == EmailCadence.DAILY:
437+
# The current window's delivery_time is at most 1 day + buffer in the past
438+
window_cutoff = now - timedelta(days=1, hours=1)
439+
elif cadence_type == EmailCadence.WEEKLY:
440+
window_cutoff = now - timedelta(days=7, hours=1)
441+
else:
442+
return
443+
444+
DigestSchedule.objects.filter(
445+
user_id=user_id,
446+
cadence_type=cadence_type,
447+
delivery_time__lte=now,
448+
delivery_time__gte=window_cutoff,
449+
).delete()
138450

139451

140452
def send_immediate_cadence_email(email_notification_mapping, course_key):

0 commit comments

Comments
 (0)