From a5b7d30008e5ad99b4c6a7e863ded867d1ebabe8 Mon Sep 17 00:00:00 2001 From: swthewhite Date: Thu, 19 Mar 2026 15:05:08 +0900 Subject: [PATCH 1/9] =?UTF-8?q?DABOM-508=20chore:=20Kafka=20usage-realtime?= =?UTF-8?q?=20=EC=BB=A8=EC=8A=88=EB=A8=B8=20=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - UsageRecordKafkaConsumer 삭제 (USAGE_REALTIME 토픽 리스너) - UsageRealtimeBroadcastKafkaConfig 삭제 (브로드캐스트 전용 컨테이너 팩토리) - application.yml에서 spring.kafka.broadcast 설정 제거 - 사용량 데이터는 PollingService의 DB Polling으로 전환 예정 --- .../messaging/UsageRecordKafkaConsumer.java | 52 -------------- .../UsageRealtimeBroadcastKafkaConfig.java | 67 ------------------- src/main/resources/application.yml | 2 - 3 files changed, 121 deletions(-) delete mode 100644 src/main/java/com/project/domain/usagerecord/infra/messaging/UsageRecordKafkaConsumer.java delete mode 100644 src/main/java/com/project/domain/usagerecord/infra/messaging/config/UsageRealtimeBroadcastKafkaConfig.java diff --git a/src/main/java/com/project/domain/usagerecord/infra/messaging/UsageRecordKafkaConsumer.java b/src/main/java/com/project/domain/usagerecord/infra/messaging/UsageRecordKafkaConsumer.java deleted file mode 100644 index 45d616d..0000000 --- a/src/main/java/com/project/domain/usagerecord/infra/messaging/UsageRecordKafkaConsumer.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.project.domain.usagerecord.infra.messaging; - -import java.time.LocalDateTime; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -import com.dabom.messaging.kafka.contract.KafkaEventTypes; -import com.dabom.messaging.kafka.contract.KafkaTopics; -import com.dabom.messaging.kafka.event.KafkaEventMessageSupport; -import com.dabom.messaging.kafka.event.dto.EventEnvelope; -import com.dabom.messaging.kafka.event.dto.usage.UsageRealtimePayload; -import com.fasterxml.jackson.core.type.TypeReference; -import com.project.domain.usagerecord.infra.messaging.config.UsageRealtimeBroadcastKafkaConfig; -import com.project.domain.usagerecord.infra.sse.SsePublisher; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@Component -@RequiredArgsConstructor -public class UsageRecordKafkaConsumer { - - private final KafkaEventMessageSupport kafkaEventMessageSupport; - private final SsePublisher ssePublisher; - - @KafkaListener( - topics = KafkaTopics.USAGE_REALTIME, - containerFactory = - UsageRealtimeBroadcastKafkaConfig - .USAGE_REALTIME_BROADCAST_KAFKA_LISTENER_CONTAINER_FACTORY) - public void consume(ConsumerRecord record) { - kafkaEventMessageSupport.consumeByEventType( - record, - KafkaEventTypes.USAGE_REALTIME, - new TypeReference>() {}, - (envelope, key) -> { - UsageRealtimePayload payload = envelope.payload(); - LocalDateTime publishTime = envelope.timestamp(); - - log.info( - "FamilyId:{}, totalUsedBytes:{}", - payload.familyId(), - payload.totalUsedBytes()); - - ssePublisher.pushMemberUsageBytes(payload, publishTime); - ssePublisher.pushTotalUsageBytes(payload, publishTime); - }); - } -} diff --git a/src/main/java/com/project/domain/usagerecord/infra/messaging/config/UsageRealtimeBroadcastKafkaConfig.java b/src/main/java/com/project/domain/usagerecord/infra/messaging/config/UsageRealtimeBroadcastKafkaConfig.java deleted file mode 100644 index 265caba..0000000 --- a/src/main/java/com/project/domain/usagerecord/infra/messaging/config/UsageRealtimeBroadcastKafkaConfig.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.project.domain.usagerecord.infra.messaging.config; - -import java.util.UUID; - -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.listener.CommonErrorHandler; - -import com.dabom.messaging.kafka.autoconfigure.KafkaConfig; -import com.dabom.messaging.kafka.autoconfigure.KafkaErrorHandlerConfig; -import com.dabom.messaging.kafka.error.KafkaExceptionClassifier; -import com.dabom.messaging.kafka.event.KafkaEventMessageSupport; -import com.dabom.messaging.kafka.event.publisher.DefaultKafkaEventPublisher; -import com.dabom.messaging.kafka.metrics.KafkaMetrics; -import com.dabom.messaging.kafka.metrics.consumer.KafkaMetricsRecordInterceptor; -import com.dabom.messaging.kafka.metrics.producer.KafkaMetricsProducerListener; -import com.dabom.messaging.kafka.support.KafkaEventMetadataExtractor; -import com.dabom.messaging.kafka.support.KafkaLogSanitizer; - -@Configuration -@Import({ - KafkaConfig.class, - KafkaErrorHandlerConfig.class, - KafkaMetrics.class, - KafkaExceptionClassifier.class, - KafkaEventMetadataExtractor.class, - KafkaLogSanitizer.class, - KafkaEventMessageSupport.class, - KafkaMetricsRecordInterceptor.class, - KafkaMetricsProducerListener.class, - DefaultKafkaEventPublisher.class -}) -public class UsageRealtimeBroadcastKafkaConfig { - - public static final String USAGE_REALTIME_BROADCAST_KAFKA_LISTENER_CONTAINER_FACTORY = - "usageRealtimeBroadcastKafkaListenerContainerFactory"; - - private static final int GROUP_ID_SUFFIX_LENGTH = 8; - - @Bean(USAGE_REALTIME_BROADCAST_KAFKA_LISTENER_CONTAINER_FACTORY) - public ConcurrentKafkaListenerContainerFactory - usageRealtimeBroadcastKafkaListenerContainerFactory( - ConsumerFactory consumerFactory, - CommonErrorHandler kafkaCommonErrorHandler, - KafkaMetricsRecordInterceptor kafkaMetricsRecordInterceptor, - @Value("${spring.kafka.broadcast.group-id-prefix:usage-realtime}") - String broadcastGroupIdPrefix) { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory); - factory.getContainerProperties() - .setGroupId( - broadcastGroupIdPrefix - + "-" - + UUID.randomUUID() - .toString() - .substring(0, GROUP_ID_SUFFIX_LENGTH)); - factory.getContainerProperties().setObservationEnabled(true); - factory.setRecordInterceptor(kafkaMetricsRecordInterceptor); - factory.setCommonErrorHandler(kafkaCommonErrorHandler); - return factory; - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3af9dc8..e16511c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -40,8 +40,6 @@ spring: consumer: group-id: ${KAFKA_CONSUMER_GROUP_ID:dabom-api-notification} auto-offset-reset: ${KAFKA_AUTO_OFFSET_RESET:earliest} - broadcast: - group-id-prefix: ${KAFKA_BROADCAST_GROUP_PREFIX:usage-realtime} producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer From 9ce86ea9f00dffb8bef8e8576dc8ecb9ee628d64 Mon Sep 17 00:00:00 2001 From: swthewhite Date: Thu, 19 Mar 2026 15:07:02 +0900 Subject: [PATCH 2/9] =?UTF-8?q?DABOM-508=20feat:=20=EC=82=AC=EC=9A=A9?= =?UTF-8?q?=EB=9F=89=20=EB=8D=B0=EC=9D=B4=ED=84=B0=20DB=20Polling=EC=9C=BC?= =?UTF-8?q?=EB=A1=9C=20=EC=A0=84=ED=99=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PollingService를 Redis polling에서 DB 직접 polling으로 전환 - FamilyRepository에서 totalQuotaBytes, usedBytes 조회 - CustomerQuotaRepository 신규 생성, familyId 기준 멤버별 monthlyUsedBytes 조회 - 하드코딩 제거 (totalLimitBytes=20000, customerId=4L 등) - SsePublisher에서 pushTotalUsageBytes, pushMemberUsageBytes 및 CAS 맵 제거 - SsePublisher는 pushNotificationEvent만 유지 --- .../repository/CustomerQuotaRepository.java | 12 +++ .../usagerecord/infra/sse/PollingService.java | 70 +++++++------- .../usagerecord/infra/sse/SsePublisher.java | 96 ------------------- .../infra/sse/PollingServiceTest.java | 7 +- 4 files changed, 53 insertions(+), 132 deletions(-) create mode 100644 src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java diff --git a/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java b/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java new file mode 100644 index 0000000..d5ca81f --- /dev/null +++ b/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java @@ -0,0 +1,12 @@ +package com.project.domain.customer.repository; + +import java.util.List; + +import org.springframework.data.jpa.repository.JpaRepository; + +import com.project.domain.customer.entity.CustomerQuota; + +public interface CustomerQuotaRepository extends JpaRepository { + + List findByFamilyId(Long familyId); +} diff --git a/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java b/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java index bcadb7e..2651cd1 100644 --- a/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java +++ b/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java @@ -1,17 +1,22 @@ package com.project.domain.usagerecord.infra.sse; -import java.time.LocalDateTime; -import java.util.Optional; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import com.dabom.messaging.kafka.event.dto.usage.UsageRealtimePayload; -import com.project.domain.family.infra.cache.FamilyCacheRepository; +import com.project.domain.customer.entity.CustomerQuota; +import com.project.domain.customer.repository.CustomerQuotaRepository; +import com.project.domain.family.entity.Family; +import com.project.domain.family.repository.FamilyRepository; +import com.project.domain.usagerecord.dto.response.RealtimeTotalUsageResponse; +import com.project.domain.usagerecord.dto.response.RealtimeUsageByMemberResponse; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +@Slf4j @Service @RequiredArgsConstructor public class PollingService { @@ -19,46 +24,45 @@ public class PollingService { private static final long HEARTBEAT_DELAY_MS = 25_000L; private final EmitterRegistry emitterRegistry; - private final FamilyCacheRepository familyCacheRepository; - private final ConcurrentHashMap lastSeen = new ConcurrentHashMap<>(); - private final SsePublisher ssePublisher; + private final FamilyRepository familyRepository; + private final CustomerQuotaRepository customerQuotaRepository; - // 1) 활성 familyId를 순회하며 최신 잔여 용량을 조회합니다. - // 2) 이전 값과 다를 때만 페이로드를 생성해 SSE로 전송합니다. + private final ConcurrentHashMap lastSeenUsedBytes = new ConcurrentHashMap<>(); + + // 1) 활성 familyId를 순회하며 DB에서 최신 사용량을 조회합니다. + // 2) 이전 값과 다를 때만 SSE로 전송합니다. @Scheduled(fixedDelay = 1000) public void pollAndPushIfChanged() { for (Long familyId : emitterRegistry.activeFamilyIds()) { - Optional latestOpt = familyCacheRepository.findFamilyRemainingBytes(familyId); - if (latestOpt.isEmpty()) { - lastSeen.remove(familyId); + Family family = familyRepository.findById(familyId).orElse(null); + if (family == null) { + lastSeenUsedBytes.remove(familyId); continue; } - long remainingBytes = latestOpt.get(); - Long prev = lastSeen.putIfAbsent(familyId, remainingBytes); + long usedBytes = family.getUsedBytes(); + Long prev = lastSeenUsedBytes.putIfAbsent(familyId, usedBytes); - if (prev == null || prev.longValue() != remainingBytes) { - lastSeen.put(familyId, remainingBytes); + if (prev == null || prev.longValue() != usedBytes) { + lastSeenUsedBytes.put(familyId, usedBytes); - // 임시로 고정 - long totalLimitBytes = 20000; - long totalUsedBytes = totalLimitBytes - remainingBytes; + long totalQuotaBytes = family.getTotalQuotaBytes(); + long remainingBytes = totalQuotaBytes - usedBytes; - UsageRealtimePayload payload = - new UsageRealtimePayload( - familyId, - 4L, - totalUsedBytes, - totalLimitBytes, - remainingBytes, - 30.0, - null, - null, - null); + RealtimeTotalUsageResponse totalResponse = + new RealtimeTotalUsageResponse( + familyId, usedBytes, totalQuotaBytes, remainingBytes); + emitterRegistry.send(familyId, "usage-updated", totalResponse); - LocalDateTime now = LocalDateTime.now(); - ssePublisher.pushTotalUsageBytes(payload, now); - ssePublisher.pushMemberUsageBytes(payload, now); + List quotas = customerQuotaRepository.findByFamilyId(familyId); + for (CustomerQuota quota : quotas) { + RealtimeUsageByMemberResponse memberResponse = + new RealtimeUsageByMemberResponse( + familyId, + quota.getCustomerId(), + quota.getMonthlyUsedBytes()); + emitterRegistry.send(familyId, "usage-updated-by-member", memberResponse); + } } } } diff --git a/src/main/java/com/project/domain/usagerecord/infra/sse/SsePublisher.java b/src/main/java/com/project/domain/usagerecord/infra/sse/SsePublisher.java index 2b0d9ae..4e56c7a 100644 --- a/src/main/java/com/project/domain/usagerecord/infra/sse/SsePublisher.java +++ b/src/main/java/com/project/domain/usagerecord/infra/sse/SsePublisher.java @@ -1,16 +1,7 @@ package com.project.domain.usagerecord.infra.sse; -import java.time.LocalDateTime; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; - -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; -import com.dabom.messaging.kafka.event.dto.usage.UsageRealtimePayload; -import com.project.domain.usagerecord.dto.response.RealtimeTotalUsageResponse; -import com.project.domain.usagerecord.dto.response.RealtimeUsageByMemberResponse; - import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -21,94 +12,7 @@ public class SsePublisher { private final EmitterRegistry emitterRegistry; - private final ConcurrentHashMap> - lastTotalBytesTimeByFamily = new ConcurrentHashMap<>(); - - private final ConcurrentHashMap> - lastTotalMemberBytesTimeByFamily = new ConcurrentHashMap<>(); - - // 1) 가족별 최신 발행 시각보다 오래된 이벤트는 폐기합니다. - // 2) 최신 이벤트만 응답 DTO로 변환해 SSE로 전송합니다. - @Async - public void pushTotalUsageBytes(UsageRealtimePayload payload, LocalDateTime publishedDateTime) { - log.info("pushTotalUsageBytes thread : {}", Thread.currentThread().getName()); - Long familyId = payload.familyId(); - AtomicReference lastRef = totalTsRef(familyId); - - while (true) { - LocalDateTime current = lastRef.get(); - - if (!publishedDateTime.isAfter(current)) { - log.info( - "drop older total event: familyId={}, incoming={}, last={}", - familyId, - publishedDateTime, - current); - return; - } - - if (lastRef.compareAndSet(current, publishedDateTime)) { - break; - } - } - - RealtimeTotalUsageResponse response = - new RealtimeTotalUsageResponse( - payload.familyId(), - payload.totalUsedBytes(), - payload.totalLimitBytes(), - payload.remainingBytes()); - - emitterRegistry.send(familyId, "usage-updated", response); - } - - // 1) 가족별 멤버 사용량 이벤트의 최신 시각을 CAS로 보장합니다. - // 2) 최신 이벤트만 멤버 단위 응답으로 변환해 SSE로 전송합니다. - @Async - public void pushMemberUsageBytes( - UsageRealtimePayload payload, LocalDateTime publishedDateTime) { - log.info("pushMemberUsageBytes thread : {}", Thread.currentThread().getName()); - - Long familyId = payload.familyId(); - Long customerId = payload.customerId(); - AtomicReference lastRef = memberTsRef(familyId); - - while (true) { - LocalDateTime current = lastRef.get(); - - if (!publishedDateTime.isAfter(current)) { - log.info( - "drop older member event: familyId={}, customerId={}, incoming={}, last={}", - familyId, - customerId, - publishedDateTime, - current); - return; - } - - if (lastRef.compareAndSet(current, publishedDateTime)) { - break; - } - } - - RealtimeUsageByMemberResponse response = - new RealtimeUsageByMemberResponse(familyId, customerId, payload.monthlyUsedBytes()); - emitterRegistry.send(familyId, "usage-updated-by-member", response); - } - public void pushNotificationEvent(Long familyId, String eventType, Object payload) { emitterRegistry.send(familyId, eventType, payload); } - - // 가족별 총 사용량 최신 시각 저장소를 초기화하거나 반환합니다. - private AtomicReference totalTsRef(Long familyId) { - return lastTotalBytesTimeByFamily.computeIfAbsent( - familyId, id -> new AtomicReference<>(LocalDateTime.MIN)); - } - - // 가족별 멤버 사용량 최신 시각 저장소를 초기화하거나 반환합니다. - private AtomicReference memberTsRef(Long familyId) { - return lastTotalMemberBytesTimeByFamily.computeIfAbsent( - familyId, id -> new AtomicReference<>(LocalDateTime.MIN)); - } } diff --git a/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java b/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java index a423651..65e4d88 100644 --- a/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java +++ b/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java @@ -9,16 +9,17 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import com.project.domain.family.infra.cache.FamilyCacheRepository; +import com.project.domain.customer.repository.CustomerQuotaRepository; +import com.project.domain.family.repository.FamilyRepository; @ExtendWith(MockitoExtension.class) class PollingServiceTest { @Mock private EmitterRegistry emitterRegistry; - @Mock private FamilyCacheRepository familyCacheRepository; + @Mock private FamilyRepository familyRepository; - @Mock private SsePublisher ssePublisher; + @Mock private CustomerQuotaRepository customerQuotaRepository; @InjectMocks private PollingService pollingService; From 0d8d056977e4ee78d75013f0153f8c2049e9fc8c Mon Sep 17 00:00:00 2001 From: swthewhite Date: Thu, 19 Mar 2026 15:51:43 +0900 Subject: [PATCH 3/9] =?UTF-8?q?DABOM-508=20fix:=20PollingService=20N+1=20?= =?UTF-8?q?=ED=95=B4=EC=86=8C=20=EB=B0=8F=20Optional=20=EC=B2=98=EB=A6=AC?= =?UTF-8?q?=20=EA=B0=9C=EC=84=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - familyRepository.findAllById()로 활성 family 일괄 조회하여 N+1 방지 - customerQuotaRepository.findByFamilyIdIn()으로 변경된 family의 멤버 쿼터 일괄 조회 - findById().orElse(null) 패턴 제거, Map 기반 조회로 전환 - 변경된 familyId만 필터링 후 멤버 쿼터 조회하여 불필요한 쿼리 최소화 --- .../repository/CustomerQuotaRepository.java | 3 ++ .../usagerecord/infra/sse/PollingService.java | 52 ++++++++++++++----- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java b/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java index d5ca81f..bb5883d 100644 --- a/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java +++ b/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java @@ -1,5 +1,6 @@ package com.project.domain.customer.repository; +import java.util.Collection; import java.util.List; import org.springframework.data.jpa.repository.JpaRepository; @@ -9,4 +10,6 @@ public interface CustomerQuotaRepository extends JpaRepository { List findByFamilyId(Long familyId); + + List findByFamilyIdIn(Collection familyIds); } diff --git a/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java b/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java index 2651cd1..d0d251a 100644 --- a/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java +++ b/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java @@ -1,7 +1,12 @@ package com.project.domain.usagerecord.infra.sse; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -29,12 +34,22 @@ public class PollingService { private final ConcurrentHashMap lastSeenUsedBytes = new ConcurrentHashMap<>(); - // 1) 활성 familyId를 순회하며 DB에서 최신 사용량을 조회합니다. - // 2) 이전 값과 다를 때만 SSE로 전송합니다. + // 1) 활성 familyId 전체를 일괄 조회하여 N+1 문제를 방지합니다. + // 2) 이전 값과 다른 family만 필터링하여 SSE로 전송합니다. @Scheduled(fixedDelay = 1000) public void pollAndPushIfChanged() { - for (Long familyId : emitterRegistry.activeFamilyIds()) { - Family family = familyRepository.findById(familyId).orElse(null); + Set activeFamilyIds = emitterRegistry.activeFamilyIds(); + if (activeFamilyIds.isEmpty()) { + return; + } + + Map familiesById = + familyRepository.findAllById(activeFamilyIds).stream() + .collect(Collectors.toMap(Family::getId, Function.identity())); + + List changedFamilyIds = new ArrayList<>(); + for (Long familyId : activeFamilyIds) { + Family family = familiesById.get(familyId); if (family == null) { lastSeenUsedBytes.remove(familyId); continue; @@ -45,6 +60,7 @@ public void pollAndPushIfChanged() { if (prev == null || prev.longValue() != usedBytes) { lastSeenUsedBytes.put(familyId, usedBytes); + changedFamilyIds.add(familyId); long totalQuotaBytes = family.getTotalQuotaBytes(); long remainingBytes = totalQuotaBytes - usedBytes; @@ -53,16 +69,26 @@ public void pollAndPushIfChanged() { new RealtimeTotalUsageResponse( familyId, usedBytes, totalQuotaBytes, remainingBytes); emitterRegistry.send(familyId, "usage-updated", totalResponse); + } + } + + if (changedFamilyIds.isEmpty()) { + return; + } + + Map> quotasByFamilyId = + customerQuotaRepository.findByFamilyIdIn(changedFamilyIds).stream() + .collect(Collectors.groupingBy(CustomerQuota::getFamilyId)); - List quotas = customerQuotaRepository.findByFamilyId(familyId); - for (CustomerQuota quota : quotas) { - RealtimeUsageByMemberResponse memberResponse = - new RealtimeUsageByMemberResponse( - familyId, - quota.getCustomerId(), - quota.getMonthlyUsedBytes()); - emitterRegistry.send(familyId, "usage-updated-by-member", memberResponse); - } + for (Long familyId : changedFamilyIds) { + List quotas = quotasByFamilyId.getOrDefault(familyId, List.of()); + for (CustomerQuota quota : quotas) { + RealtimeUsageByMemberResponse memberResponse = + new RealtimeUsageByMemberResponse( + familyId, + quota.getCustomerId(), + quota.getMonthlyUsedBytes()); + emitterRegistry.send(familyId, "usage-updated-by-member", memberResponse); } } } From 4c1a7b1aacada07914f4adb1405e44f7c830d271 Mon Sep 17 00:00:00 2001 From: swthewhite Date: Thu, 19 Mar 2026 15:52:45 +0900 Subject: [PATCH 4/9] =?UTF-8?q?DABOM-508=20test:=20PollingService=20pollAn?= =?UTF-8?q?dPushIfChanged=20=EB=8B=A8=EC=9C=84=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=20=EB=B3=B4=EA=B0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 사용량 변경 시 usage-updated, usage-updated-by-member 이벤트 전송 검증 - 사용량 미변경 시 이벤트 미전송 검증 - Family 미존재 시 이벤트 미전송 검증 --- .../infra/sse/PollingServiceTest.java | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java b/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java index 65e4d88..e61e0b7 100644 --- a/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java +++ b/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java @@ -1,6 +1,13 @@ package com.project.domain.usagerecord.infra.sse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Set; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -9,8 +16,12 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import com.project.domain.customer.entity.CustomerQuota; import com.project.domain.customer.repository.CustomerQuotaRepository; +import com.project.domain.family.entity.Family; import com.project.domain.family.repository.FamilyRepository; +import com.project.domain.usagerecord.dto.response.RealtimeTotalUsageResponse; +import com.project.domain.usagerecord.dto.response.RealtimeUsageByMemberResponse; @ExtendWith(MockitoExtension.class) class PollingServiceTest { @@ -32,4 +43,89 @@ void sendHeartbeat_calls_EmitterRegistry() { // then verify(emitterRegistry).sendHeartbeat(); } + + @Test + @DisplayName("사용량이 변경되면 usage-updated와 usage-updated-by-member 이벤트를 전송합니다.") + void pollAndPushIfChanged_sendsEvents_whenUsageChanged() { + // given + Long familyId = 1L; + Family family = Family.builder() + .id(familyId) + .name("test") + .createdById(1L) + .totalQuotaBytes(10000L) + .usedBytes(3000L) + .build(); + CustomerQuota quota = CustomerQuota.builder() + .customerId(10L) + .familyId(familyId) + .monthlyUsedBytes(3000L) + .build(); + + when(emitterRegistry.activeFamilyIds()).thenReturn(Set.of(familyId)); + when(familyRepository.findAllById(Set.of(familyId))).thenReturn(List.of(family)); + when(customerQuotaRepository.findByFamilyIdIn(List.of(familyId))) + .thenReturn(List.of(quota)); + + // when + pollingService.pollAndPushIfChanged(); + + // then + verify(emitterRegistry) + .send( + eq(familyId), + eq("usage-updated"), + eq(new RealtimeTotalUsageResponse(familyId, 3000L, 10000L, 7000L))); + verify(emitterRegistry) + .send( + eq(familyId), + eq("usage-updated-by-member"), + eq(new RealtimeUsageByMemberResponse(familyId, 10L, 3000L))); + } + + @Test + @DisplayName("사용량 변경이 없으면 이벤트를 전송하지 않습니다.") + void pollAndPushIfChanged_doesNotSendEvents_whenUsageUnchanged() { + // given + Long familyId = 1L; + Family family = Family.builder() + .id(familyId) + .name("test") + .createdById(1L) + .totalQuotaBytes(10000L) + .usedBytes(3000L) + .build(); + + when(emitterRegistry.activeFamilyIds()).thenReturn(Set.of(familyId)); + when(familyRepository.findAllById(Set.of(familyId))).thenReturn(List.of(family)); + when(customerQuotaRepository.findByFamilyIdIn(List.of(familyId))) + .thenReturn(List.of()); + + // 첫 호출: 초기값 설정 + pollingService.pollAndPushIfChanged(); + + // when: 두 번째 호출 (값 동일) + pollingService.pollAndPushIfChanged(); + + // then: usage-updated는 첫 호출에서 1번만 발생 + verify(emitterRegistry) + .send(eq(familyId), eq("usage-updated"), any(RealtimeTotalUsageResponse.class)); + } + + @Test + @DisplayName("familyId에 해당하는 Family가 없으면 이벤트를 전송하지 않습니다.") + void pollAndPushIfChanged_doesNotSendEvents_whenFamilyNotFound() { + // given + Long familyId = 999L; + + when(emitterRegistry.activeFamilyIds()).thenReturn(Set.of(familyId)); + when(familyRepository.findAllById(Set.of(familyId))).thenReturn(List.of()); + + // when + pollingService.pollAndPushIfChanged(); + + // then + verify(emitterRegistry, never()).send(any(), eq("usage-updated"), any()); + verify(emitterRegistry, never()).send(any(), eq("usage-updated-by-member"), any()); + } } From 45e595363ab979c8f2fffabba8c88c47a7805ac5 Mon Sep 17 00:00:00 2001 From: swthewhite Date: Thu, 19 Mar 2026 16:57:17 +0900 Subject: [PATCH 5/9] =?UTF-8?q?DABOM-508=20refactor:=20PollingService=20?= =?UTF-8?q?=EB=A7=A4=EC=A7=81=20=EC=8A=A4=ED=8A=B8=EB=A7=81=20=EC=83=81?= =?UTF-8?q?=EC=88=98=20=EC=B6=94=EC=B6=9C=20=EB=B0=8F=20=EB=B3=80=EA=B2=BD?= =?UTF-8?q?=20=EA=B0=90=EC=A7=80=20=EB=A1=9C=EC=A7=81=20=EA=B0=84=EA=B2=B0?= =?UTF-8?q?=ED=99=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 이벤트명 "usage-updated", "usage-updated-by-member"를 상수로 추출 - putIfAbsent + put 중복 호출을 get + put 단일 호출로 간결화 --- .../domain/usagerecord/infra/sse/PollingService.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java b/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java index d0d251a..73108a1 100644 --- a/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java +++ b/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java @@ -27,6 +27,8 @@ public class PollingService { private static final long HEARTBEAT_DELAY_MS = 25_000L; + private static final String EVENT_USAGE_UPDATED = "usage-updated"; + private static final String EVENT_USAGE_UPDATED_BY_MEMBER = "usage-updated-by-member"; private final EmitterRegistry emitterRegistry; private final FamilyRepository familyRepository; @@ -56,7 +58,7 @@ public void pollAndPushIfChanged() { } long usedBytes = family.getUsedBytes(); - Long prev = lastSeenUsedBytes.putIfAbsent(familyId, usedBytes); + Long prev = lastSeenUsedBytes.get(familyId); if (prev == null || prev.longValue() != usedBytes) { lastSeenUsedBytes.put(familyId, usedBytes); @@ -68,7 +70,7 @@ public void pollAndPushIfChanged() { RealtimeTotalUsageResponse totalResponse = new RealtimeTotalUsageResponse( familyId, usedBytes, totalQuotaBytes, remainingBytes); - emitterRegistry.send(familyId, "usage-updated", totalResponse); + emitterRegistry.send(familyId, EVENT_USAGE_UPDATED, totalResponse); } } @@ -88,7 +90,7 @@ public void pollAndPushIfChanged() { familyId, quota.getCustomerId(), quota.getMonthlyUsedBytes()); - emitterRegistry.send(familyId, "usage-updated-by-member", memberResponse); + emitterRegistry.send(familyId, EVENT_USAGE_UPDATED_BY_MEMBER, memberResponse); } } } From 1d52881d9dcc4b5a21d23ff2933931836dddf30a Mon Sep 17 00:00:00 2001 From: swthewhite Date: Thu, 19 Mar 2026 17:09:09 +0900 Subject: [PATCH 6/9] =?UTF-8?q?DABOM-508=20fix:=20PollingService=20?= =?UTF-8?q?=EC=8A=A4=ED=83=80=EC=9D=BC=20=EA=B0=80=EC=9D=B4=EB=93=9C=20?= =?UTF-8?q?=EC=A4=80=EC=88=98=20=EB=B3=B4=EC=99=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - @Scheduled(fixedDelay = 1000) 매직넘버를 POLLING_DELAY_MS 상수로 추출 - DB 조회 메서드에 @Transactional(readOnly = true) 선언하여 일관된 읽기 보장 - spotlessApply 포매팅 적용 --- .../domain/usagerecord/infra/sse/PollingService.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java b/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java index 73108a1..fb7933d 100644 --- a/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java +++ b/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java @@ -10,6 +10,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import com.project.domain.customer.entity.CustomerQuota; import com.project.domain.customer.repository.CustomerQuotaRepository; @@ -26,6 +27,7 @@ @RequiredArgsConstructor public class PollingService { + private static final long POLLING_DELAY_MS = 1_000L; private static final long HEARTBEAT_DELAY_MS = 25_000L; private static final String EVENT_USAGE_UPDATED = "usage-updated"; private static final String EVENT_USAGE_UPDATED_BY_MEMBER = "usage-updated-by-member"; @@ -38,7 +40,8 @@ public class PollingService { // 1) 활성 familyId 전체를 일괄 조회하여 N+1 문제를 방지합니다. // 2) 이전 값과 다른 family만 필터링하여 SSE로 전송합니다. - @Scheduled(fixedDelay = 1000) + @Transactional(readOnly = true) + @Scheduled(fixedDelay = POLLING_DELAY_MS) public void pollAndPushIfChanged() { Set activeFamilyIds = emitterRegistry.activeFamilyIds(); if (activeFamilyIds.isEmpty()) { @@ -87,9 +90,7 @@ public void pollAndPushIfChanged() { for (CustomerQuota quota : quotas) { RealtimeUsageByMemberResponse memberResponse = new RealtimeUsageByMemberResponse( - familyId, - quota.getCustomerId(), - quota.getMonthlyUsedBytes()); + familyId, quota.getCustomerId(), quota.getMonthlyUsedBytes()); emitterRegistry.send(familyId, EVENT_USAGE_UPDATED_BY_MEMBER, memberResponse); } } From daf99e34a7c28ceda21e3d499d9695ebdd3a7e55 Mon Sep 17 00:00:00 2001 From: swthewhite Date: Thu, 19 Mar 2026 17:14:37 +0900 Subject: [PATCH 7/9] =?UTF-8?q?DABOM-508=20chore:=20=EB=AF=B8=EC=82=AC?= =?UTF-8?q?=EC=9A=A9=20Redis=20=EC=BA=90=EC=8B=9C=20=EA=B4=80=EB=A0=A8=20?= =?UTF-8?q?=EC=BD=94=EB=93=9C=20=EC=A0=95=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - FamilyCacheRepository 삭제 (DB Polling 전환으로 사용처 없음) - FamilyCacheDto 삭제 (FamilyCacheRepository 전용) - RedisKeyGenerator 삭제 (FamilyCacheRepository 전용) - RedisConfig 삭제 (두 RedisTemplate bean 모두 FamilyCacheRepository 전용) --- .../project/common/config/RedisConfig.java | 57 ---------------- .../common/util/RedisKeyGenerator.java | 41 ------------ .../infra/cache/FamilyCacheRepository.java | 66 ------------------- .../infra/cache/dto/FamilyCacheDto.java | 35 ---------- 4 files changed, 199 deletions(-) delete mode 100644 src/main/java/com/project/common/config/RedisConfig.java delete mode 100644 src/main/java/com/project/common/util/RedisKeyGenerator.java delete mode 100644 src/main/java/com/project/domain/family/infra/cache/FamilyCacheRepository.java delete mode 100644 src/main/java/com/project/domain/family/infra/cache/dto/FamilyCacheDto.java diff --git a/src/main/java/com/project/common/config/RedisConfig.java b/src/main/java/com/project/common/config/RedisConfig.java deleted file mode 100644 index 1439bad..0000000 --- a/src/main/java/com/project/common/config/RedisConfig.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.project.common.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; -import org.springframework.data.redis.serializer.StringRedisSerializer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import com.project.domain.family.infra.cache.dto.FamilyCacheDto; - -@Configuration -public class RedisConfig { - - @Bean - public RedisTemplate familyStringRedisTemplate( - RedisConnectionFactory connectionFactory) { - RedisTemplate template = new RedisTemplate<>(); - template.setConnectionFactory(connectionFactory); - - StringRedisSerializer serializer = new StringRedisSerializer(); - template.setKeySerializer(serializer); - template.setHashKeySerializer(serializer); - template.setValueSerializer(serializer); - template.setHashValueSerializer(serializer); - - template.afterPropertiesSet(); - return template; - } - - @Bean - public RedisTemplate familyCacheRedisTemplate( - RedisConnectionFactory connectionFactory) { - RedisTemplate template = new RedisTemplate<>(); - template.setConnectionFactory(connectionFactory); - - StringRedisSerializer keySerializer = new StringRedisSerializer(); - - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.registerModule(new JavaTimeModule()); - objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - - Jackson2JsonRedisSerializer valueSerializer = - new Jackson2JsonRedisSerializer(objectMapper, FamilyCacheDto.class); - - template.setKeySerializer(keySerializer); - template.setHashKeySerializer(keySerializer); - template.setValueSerializer(valueSerializer); - template.setHashValueSerializer(valueSerializer); - - template.afterPropertiesSet(); - return template; - } -} diff --git a/src/main/java/com/project/common/util/RedisKeyGenerator.java b/src/main/java/com/project/common/util/RedisKeyGenerator.java deleted file mode 100644 index b6fd416..0000000 --- a/src/main/java/com/project/common/util/RedisKeyGenerator.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.project.common.util; - -import org.springframework.stereotype.Component; - -@Component -public class RedisKeyGenerator { - - private static final String KEY_SEPARATOR = ":"; - private static final String EXAMPLE_KEY_PREFIX = "example"; - private static final String FAMILY_KEY_PREFIX = "family"; - - public String generateExampleKey(Long exampleId) { - return EXAMPLE_KEY_PREFIX + KEY_SEPARATOR + exampleId; - } - - public String generateFamilyInfoKey(Long familyId) { - return FAMILY_KEY_PREFIX + KEY_SEPARATOR + familyId + KEY_SEPARATOR + "info"; - } - - public String generateFamilyRemainingKey(Long familyId) { - return FAMILY_KEY_PREFIX + KEY_SEPARATOR + familyId + KEY_SEPARATOR + "remaining"; - } - - public String generateFamilyCustomerMonthlyUsageKey(Long familyId, Long customerId) { - return FAMILY_KEY_PREFIX - + KEY_SEPARATOR - + familyId - + KEY_SEPARATOR - + "customer" - + KEY_SEPARATOR - + customerId - + KEY_SEPARATOR - + "usage" - + KEY_SEPARATOR - + "monthly"; - } - - public String generateFamilyKey(Long familyId) { - return FAMILY_KEY_PREFIX + KEY_SEPARATOR + familyId; - } -} diff --git a/src/main/java/com/project/domain/family/infra/cache/FamilyCacheRepository.java b/src/main/java/com/project/domain/family/infra/cache/FamilyCacheRepository.java deleted file mode 100644 index 1629fc7..0000000 --- a/src/main/java/com/project/domain/family/infra/cache/FamilyCacheRepository.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.project.domain.family.infra.cache; - -import java.util.Optional; - -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Repository; - -import com.project.common.util.RedisKeyGenerator; -import com.project.domain.family.entity.Family; -import com.project.domain.family.infra.cache.dto.FamilyCacheDto; - -import lombok.RequiredArgsConstructor; - -@Repository -@RequiredArgsConstructor -public class FamilyCacheRepository { - - private final RedisTemplate familyStringRedisTemplate; - private final RedisTemplate familyCacheRedisTemplate; - private final RedisKeyGenerator redisKeyGenerator; - - public void save(Family family) { - String key = redisKeyGenerator.generateFamilyInfoKey(family.getId()); - familyCacheRedisTemplate.opsForValue().set(key, FamilyCacheDto.from(family)); - } - - public Optional findById(Long familyId) { - String key = redisKeyGenerator.generateFamilyInfoKey(familyId); - FamilyCacheDto dto = familyCacheRedisTemplate.opsForValue().get(key); - - if (dto == null) { - return Optional.empty(); - } - - return Optional.of(dto.toEntity()); - } - - public void evict(Long familyId) { - String key = redisKeyGenerator.generateFamilyInfoKey(familyId); - familyCacheRedisTemplate.delete(key); - } - - public Optional findFamilyRemainingBytes(Long familyId) { - String key = redisKeyGenerator.generateFamilyRemainingKey(familyId); - return findLongValue(key); - } - - public Optional findCustomerMonthlyUsageBytes(Long familyId, Long customerId) { - String key = redisKeyGenerator.generateFamilyCustomerMonthlyUsageKey(familyId, customerId); - return findLongValue(key); - } - - private Optional findLongValue(String key) { - String raw = familyStringRedisTemplate.opsForValue().get(key); - - if (raw == null || raw.isBlank()) { - return Optional.empty(); - } - - try { - return Optional.of(Long.parseLong(raw)); - } catch (NumberFormatException e) { - return Optional.empty(); - } - } -} diff --git a/src/main/java/com/project/domain/family/infra/cache/dto/FamilyCacheDto.java b/src/main/java/com/project/domain/family/infra/cache/dto/FamilyCacheDto.java deleted file mode 100644 index 735e2dc..0000000 --- a/src/main/java/com/project/domain/family/infra/cache/dto/FamilyCacheDto.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.project.domain.family.infra.cache.dto; - -import java.time.LocalDate; - -import com.project.domain.family.entity.Family; - -public record FamilyCacheDto( - Long id, - String name, - Long createdById, - Long totalQuotaBytes, - Long usedBytes, - LocalDate currentMonth) { - - public static FamilyCacheDto from(Family family) { - return new FamilyCacheDto( - family.getId(), - family.getName(), - family.getCreatedById(), - family.getTotalQuotaBytes(), - family.getUsedBytes(), - family.getCurrentMonth()); - } - - public Family toEntity() { - return Family.builder() - .id(id) - .name(name) - .createdById(createdById) - .totalQuotaBytes(totalQuotaBytes) - .usedBytes(usedBytes) - .currentMonth(currentMonth) - .build(); - } -} From b415c1558e5d8d53b7753ed7c41428bad73cc66c Mon Sep 17 00:00:00 2001 From: swthewhite Date: Thu, 19 Mar 2026 18:19:50 +0900 Subject: [PATCH 8/9] =?UTF-8?q?DABOM-508=20chore:=20=EB=AF=B8=EC=82=AC?= =?UTF-8?q?=EC=9A=A9=20=EC=BD=94=EB=93=9C=20=EC=A0=95=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - SsePublisher에서 미사용 @Slf4j 어노테이션 제거 - CustomerQuotaRepository에서 미사용 findByFamilyId() 메서드 제거 --- .../repository/CustomerQuotaRepository.java | 2 - .../usagerecord/infra/sse/SsePublisher.java | 2 - .../infra/sse/PollingServiceTest.java | 44 ++++++++++--------- 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java b/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java index bb5883d..9bea9c9 100644 --- a/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java +++ b/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java @@ -9,7 +9,5 @@ public interface CustomerQuotaRepository extends JpaRepository { - List findByFamilyId(Long familyId); - List findByFamilyIdIn(Collection familyIds); } diff --git a/src/main/java/com/project/domain/usagerecord/infra/sse/SsePublisher.java b/src/main/java/com/project/domain/usagerecord/infra/sse/SsePublisher.java index 4e56c7a..43aa2b4 100644 --- a/src/main/java/com/project/domain/usagerecord/infra/sse/SsePublisher.java +++ b/src/main/java/com/project/domain/usagerecord/infra/sse/SsePublisher.java @@ -3,11 +3,9 @@ import org.springframework.stereotype.Service; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; @Service @RequiredArgsConstructor -@Slf4j public class SsePublisher { private final EmitterRegistry emitterRegistry; diff --git a/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java b/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java index e61e0b7..9ba6410 100644 --- a/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java +++ b/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java @@ -49,18 +49,20 @@ void sendHeartbeat_calls_EmitterRegistry() { void pollAndPushIfChanged_sendsEvents_whenUsageChanged() { // given Long familyId = 1L; - Family family = Family.builder() - .id(familyId) - .name("test") - .createdById(1L) - .totalQuotaBytes(10000L) - .usedBytes(3000L) - .build(); - CustomerQuota quota = CustomerQuota.builder() - .customerId(10L) - .familyId(familyId) - .monthlyUsedBytes(3000L) - .build(); + Family family = + Family.builder() + .id(familyId) + .name("test") + .createdById(1L) + .totalQuotaBytes(10000L) + .usedBytes(3000L) + .build(); + CustomerQuota quota = + CustomerQuota.builder() + .customerId(10L) + .familyId(familyId) + .monthlyUsedBytes(3000L) + .build(); when(emitterRegistry.activeFamilyIds()).thenReturn(Set.of(familyId)); when(familyRepository.findAllById(Set.of(familyId))).thenReturn(List.of(family)); @@ -88,18 +90,18 @@ void pollAndPushIfChanged_sendsEvents_whenUsageChanged() { void pollAndPushIfChanged_doesNotSendEvents_whenUsageUnchanged() { // given Long familyId = 1L; - Family family = Family.builder() - .id(familyId) - .name("test") - .createdById(1L) - .totalQuotaBytes(10000L) - .usedBytes(3000L) - .build(); + Family family = + Family.builder() + .id(familyId) + .name("test") + .createdById(1L) + .totalQuotaBytes(10000L) + .usedBytes(3000L) + .build(); when(emitterRegistry.activeFamilyIds()).thenReturn(Set.of(familyId)); when(familyRepository.findAllById(Set.of(familyId))).thenReturn(List.of(family)); - when(customerQuotaRepository.findByFamilyIdIn(List.of(familyId))) - .thenReturn(List.of()); + when(customerQuotaRepository.findByFamilyIdIn(List.of(familyId))).thenReturn(List.of()); // 첫 호출: 초기값 설정 pollingService.pollAndPushIfChanged(); From 08a0e9ebf28bdb8939e27fe03e4bd576aebd80c2 Mon Sep 17 00:00:00 2001 From: swthewhite Date: Thu, 19 Mar 2026 18:36:14 +0900 Subject: [PATCH 9/9] =?UTF-8?q?DABOM-508=20fix:=20=ED=85=8C=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=20=EC=BD=94=EB=93=9C=EC=97=90=EC=84=9C=20=EB=B6=88?= =?UTF-8?q?=ED=95=84=EC=9A=94=ED=95=9C=20eq()=20=ED=98=B8=EC=B6=9C=20?= =?UTF-8?q?=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 모든 인자가 구체 값인 verify()에서 eq() 래핑 제거 - any()와 혼용되는 곳은 Mockito 규칙상 eq() 유지 --- .../usagerecord/infra/sse/PollingServiceTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java b/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java index 9ba6410..c4d2c1c 100644 --- a/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java +++ b/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java @@ -75,14 +75,14 @@ void pollAndPushIfChanged_sendsEvents_whenUsageChanged() { // then verify(emitterRegistry) .send( - eq(familyId), - eq("usage-updated"), - eq(new RealtimeTotalUsageResponse(familyId, 3000L, 10000L, 7000L))); + familyId, + "usage-updated", + new RealtimeTotalUsageResponse(familyId, 3000L, 10000L, 7000L)); verify(emitterRegistry) .send( - eq(familyId), - eq("usage-updated-by-member"), - eq(new RealtimeUsageByMemberResponse(familyId, 10L, 3000L))); + familyId, + "usage-updated-by-member", + new RealtimeUsageByMemberResponse(familyId, 10L, 3000L)); } @Test