Skip to content

Commit 0662cb7

Browse files
authored
Merge pull request #253 from FunD-StockProject/fix/notification-only-one-push
Fix: 알림 시도때도 없이 발생해서 푸시 알림으로 가던 현상 해결
2 parents 35ed852 + a323fb3 commit 0662cb7

3 files changed

Lines changed: 143 additions & 28 deletions

File tree

src/main/java/com/fund/stockProject/notification/repository/OutboxRepository.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,20 @@ public interface OutboxRepository extends JpaRepository<OutboxEvent, Integer> {
2323
@Query("SELECT e FROM OutboxEvent e WHERE e.status IN :statuses AND (e.scheduledAt IS NULL OR e.scheduledAt <= :now)")
2424
Page<OutboxEvent> findReadyToProcessInStatuses(@Param("statuses") List<String> statuses, @Param("now") Instant now, Pageable pageable);
2525

26+
@Query("""
27+
SELECT e FROM OutboxEvent e
28+
WHERE e.status = 'PENDING'
29+
AND e.scheduledAt IS NULL
30+
""")
31+
Page<OutboxEvent> findReadyImmediateEvents(Pageable pageable);
32+
33+
@Query("""
34+
SELECT e FROM OutboxEvent e
35+
WHERE e.status = 'READY_TO_SEND'
36+
AND e.scheduledAt <= :now
37+
""")
38+
Page<OutboxEvent> findReadyScheduledEvents(@Param("now") Instant now, Pageable pageable);
39+
2640
@Query("SELECT e FROM OutboxEvent e WHERE e.status = :status AND e.nextAttemptAt <= :now")
2741
List<OutboxEvent> findRetryableEvents(@Param("status") String status, @Param("now") Instant now);
2842

src/main/java/com/fund/stockProject/notification/service/OutboxDispatcher.java

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -43,31 +43,45 @@ public OutboxDispatcher(
4343
}
4444

4545
/**
46-
* 즉시 발송 처리 (15분마다)
46+
* 즉시 알림 발송 처리
4747
*/
48-
@Scheduled(fixedDelay = 900000) // 15분 = 900,000ms
48+
@Scheduled(fixedDelay = 60000) // 1분
4949
public void dispatchImmediate() {
5050
try {
51-
// 스케줄러에서는 트랜잭션을 사용하지 않고, 개별 처리에서만 사용
52-
var batch = outboxRepo.findReadyToProcessInStatuses(
53-
List.of("PENDING", "READY_TO_SEND"),
54-
Instant.now(),
55-
PageRequest.of(0, 100)
56-
);
51+
var batch = outboxRepo.findReadyImmediateEvents(PageRequest.of(0, 100));
5752

58-
log.debug("Found {} events to process immediately", batch.getContent().size());
53+
log.debug("Found {} immediate events to process", batch.getContent().size());
5954

6055
for (OutboxEvent e : batch.getContent()) {
6156
if (!"ALERT_CREATED".equals(e.getType())) continue;
6257

63-
// 각 이벤트를 개별 트랜잭션으로 처리
64-
processEventWithTransaction(e);
58+
processEvent(e);
6559
}
6660
} catch (Exception e) {
6761
log.error("Error in dispatchImmediate scheduler", e);
6862
}
6963
}
7064

65+
/**
66+
* 예약 알림 발송 처리 (9시대 매분)
67+
*/
68+
@Scheduled(cron = "0 * 9 * * *", zone = "Asia/Seoul")
69+
public void dispatchScheduled() {
70+
try {
71+
var batch = outboxRepo.findReadyScheduledEvents(Instant.now(), PageRequest.of(0, 200));
72+
73+
log.debug("Found {} scheduled events to process", batch.getContent().size());
74+
75+
for (OutboxEvent e : batch.getContent()) {
76+
if (!"ALERT_CREATED".equals(e.getType())) continue;
77+
78+
processEvent(e);
79+
}
80+
} catch (Exception e) {
81+
log.error("Error in dispatchScheduled scheduler", e);
82+
}
83+
}
84+
7185
/**
7286
* 재시도 이벤트 처리 (5분마다)
7387
*/
@@ -81,27 +95,13 @@ public void dispatchRetry() {
8195
for (OutboxEvent e : retryEvents) {
8296
if (!"ALERT_CREATED".equals(e.getType())) continue;
8397

84-
// 각 이벤트를 개별 트랜잭션으로 처리
85-
processEventWithTransaction(e);
98+
processEvent(e);
8699
}
87100
} catch (Exception e) {
88101
log.error("Error in dispatchRetry scheduler", e);
89102
}
90103
}
91104

92-
/**
93-
* 개별 이벤트를 트랜잭션으로 처리
94-
*/
95-
@Transactional(propagation = Propagation.REQUIRES_NEW)
96-
public void processEventWithTransaction(OutboxEvent e) {
97-
try {
98-
processEvent(e);
99-
} catch (Exception ex) {
100-
log.error("Failed to process event {}: {}", e.getId(), ex.getMessage());
101-
handleError(e, ex);
102-
}
103-
}
104-
105105
/**
106106
* 이벤트 처리 공통 로직
107107
*/
@@ -132,6 +132,7 @@ private void processEvent(OutboxEvent e) {
132132
}
133133

134134
e.setStatus("PROCESSED");
135+
outboxRepo.save(e);
135136
log.info("Notification sent successfully: userId={}, notificationId={}", userId, nId);
136137

137138
} catch (Exception ex) {
@@ -150,6 +151,7 @@ private void handleError(OutboxEvent e, Exception ex) {
150151
long backoffSec = Math.min(300, 1L << Math.min(10, retryCount));
151152
e.setNextAttemptAt(Instant.now().plusSeconds(backoffSec));
152153
e.setStatus("RETRY");
154+
outboxRepo.save(e);
153155

154156
log.warn("Notification dispatch failed, will retry: eventId={}, retryCount={}, nextAttempt={}, error={}",
155157
e.getId(), retryCount, e.getNextAttemptAt(), ex.getMessage());

src/main/java/com/fund/stockProject/notification/service/StockScoreAlertService.java

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.fund.stockProject.notification.service;
22

3+
import com.fasterxml.jackson.core.type.TypeReference;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
35
import com.fund.stockProject.notification.domain.NotificationType;
46
import com.fund.stockProject.notification.entity.Notification;
57
import com.fund.stockProject.notification.entity.OutboxEvent;
@@ -9,23 +11,28 @@
911
import com.fund.stockProject.preference.entity.Preference;
1012
import com.fund.stockProject.preference.repository.PreferenceRepository;
1113
import lombok.RequiredArgsConstructor;
14+
import lombok.extern.slf4j.Slf4j;
1215
import org.springframework.stereotype.Service;
1316
import org.springframework.transaction.annotation.Transactional;
1417

1518
import java.time.Instant;
1619
import java.time.LocalDate;
1720
import java.time.LocalTime;
1821
import java.time.ZoneId;
22+
import java.util.ArrayList;
23+
import java.util.HashMap;
1924
import java.util.List;
2025
import java.util.Map;
2126

2227
@Service
2328
@RequiredArgsConstructor
29+
@Slf4j
2430
public class StockScoreAlertService {
2531
private final NotificationRepository notificationRepo;
2632
private final OutboxRepository outboxRepo;
2733
private final NotificationService notificationService;
2834
private final PreferenceRepository preferenceRepo;
35+
private final ObjectMapper objectMapper;
2936

3037
private static final int THRESHOLD_ABS = 15;
3138

@@ -89,10 +96,102 @@ public void sendDailyScoreAlerts() {
8996
// PENDING 상태이고 scheduledAt이 현재 시간 이전인 알림들을 발송
9097
List<OutboxEvent> pendingEvents = outboxRepo.findByStatusAndScheduledAtBefore(
9198
"PENDING", Instant.now());
92-
99+
100+
if (pendingEvents.isEmpty()) {
101+
return;
102+
}
103+
104+
Map<Integer, List<OutboxEvent>> scoreSpikeEventsByUser = new HashMap<>();
105+
int readyCount = 0;
106+
int suppressedCount = 0;
107+
93108
for (OutboxEvent event : pendingEvents) {
94-
// OutboxDispatcher에서 처리하도록 상태 업데이트
109+
Map<String, Object> payload = parsePayload(event.getPayload());
110+
if (payload == null) {
111+
event.setStatus("READY_TO_SEND");
112+
readyCount++;
113+
continue;
114+
}
115+
116+
String type = String.valueOf(payload.get("type"));
117+
Integer userId = toInteger(payload.get("userId"));
118+
119+
if (NotificationType.SCORE_SPIKE.name().equals(type) && userId != null) {
120+
scoreSpikeEventsByUser.computeIfAbsent(userId, key -> new ArrayList<>()).add(event);
121+
continue;
122+
}
123+
95124
event.setStatus("READY_TO_SEND");
125+
readyCount++;
126+
}
127+
128+
for (Map.Entry<Integer, List<OutboxEvent>> entry : scoreSpikeEventsByUser.entrySet()) {
129+
List<OutboxEvent> userEvents = entry.getValue();
130+
if (userEvents.isEmpty()) {
131+
continue;
132+
}
133+
134+
OutboxEvent representative = userEvents.get(0);
135+
representative.setStatus("READY_TO_SEND");
136+
readyCount++;
137+
138+
if (userEvents.size() > 1) {
139+
updateRepresentativeMessage(representative, userEvents.size());
140+
for (int i = 1; i < userEvents.size(); i++) {
141+
userEvents.get(i).setStatus("PROCESSED");
142+
suppressedCount++;
143+
}
144+
}
145+
}
146+
147+
log.info("Daily score alerts prepared: totalPending={}, readyToSend={}, suppressed={}",
148+
pendingEvents.size(), readyCount, suppressedCount);
149+
}
150+
151+
private void updateRepresentativeMessage(OutboxEvent representative, int totalCount) {
152+
Map<String, Object> payload = parsePayload(representative.getPayload());
153+
if (payload == null) {
154+
return;
155+
}
156+
157+
Integer notificationId = toInteger(payload.get("notificationId"));
158+
if (notificationId == null) {
159+
return;
160+
}
161+
162+
notificationRepo.findById(notificationId).ifPresent(notification -> {
163+
notification.setTitle("북마크 종목 점수 급변 알림");
164+
notification.setBody("북마크한 종목 " + totalCount + "개의 점수가 크게 변했습니다.");
165+
notificationRepo.save(notification);
166+
});
167+
}
168+
169+
private Map<String, Object> parsePayload(String payload) {
170+
if (payload == null || payload.isBlank()) {
171+
return null;
172+
}
173+
174+
try {
175+
return objectMapper.readValue(payload, new TypeReference<>() {});
176+
} catch (Exception e) {
177+
log.warn("Failed to parse outbox payload: {}", e.getMessage());
178+
return null;
179+
}
180+
}
181+
182+
private Integer toInteger(Object value) {
183+
if (value == null) {
184+
return null;
185+
}
186+
187+
if (value instanceof Number number) {
188+
return number.intValue();
189+
}
190+
191+
try {
192+
return Integer.valueOf(value.toString());
193+
} catch (NumberFormatException e) {
194+
return null;
96195
}
97196
}
98197
}

0 commit comments

Comments
 (0)