diff --git a/src/main/java/com/project/common/config/RedisConfig.java b/src/main/java/com/project/common/config/RedisConfig.java deleted file mode 100644 index 1439bad..0000000 --- a/src/main/java/com/project/common/config/RedisConfig.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.project.common.config; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; -import org.springframework.data.redis.serializer.StringRedisSerializer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import com.project.domain.family.infra.cache.dto.FamilyCacheDto; - -@Configuration -public class RedisConfig { - - @Bean - public RedisTemplate familyStringRedisTemplate( - RedisConnectionFactory connectionFactory) { - RedisTemplate template = new RedisTemplate<>(); - template.setConnectionFactory(connectionFactory); - - StringRedisSerializer serializer = new StringRedisSerializer(); - template.setKeySerializer(serializer); - template.setHashKeySerializer(serializer); - template.setValueSerializer(serializer); - template.setHashValueSerializer(serializer); - - template.afterPropertiesSet(); - return template; - } - - @Bean - public RedisTemplate familyCacheRedisTemplate( - RedisConnectionFactory connectionFactory) { - RedisTemplate template = new RedisTemplate<>(); - template.setConnectionFactory(connectionFactory); - - StringRedisSerializer keySerializer = new StringRedisSerializer(); - - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.registerModule(new JavaTimeModule()); - objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - - Jackson2JsonRedisSerializer valueSerializer = - new Jackson2JsonRedisSerializer(objectMapper, FamilyCacheDto.class); - - template.setKeySerializer(keySerializer); - template.setHashKeySerializer(keySerializer); - template.setValueSerializer(valueSerializer); - template.setHashValueSerializer(valueSerializer); - - template.afterPropertiesSet(); - return template; - } -} diff --git a/src/main/java/com/project/common/util/RedisKeyGenerator.java b/src/main/java/com/project/common/util/RedisKeyGenerator.java deleted file mode 100644 index b6fd416..0000000 --- a/src/main/java/com/project/common/util/RedisKeyGenerator.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.project.common.util; - -import org.springframework.stereotype.Component; - -@Component -public class RedisKeyGenerator { - - private static final String KEY_SEPARATOR = ":"; - private static final String EXAMPLE_KEY_PREFIX = "example"; - private static final String FAMILY_KEY_PREFIX = "family"; - - public String generateExampleKey(Long exampleId) { - return EXAMPLE_KEY_PREFIX + KEY_SEPARATOR + exampleId; - } - - public String generateFamilyInfoKey(Long familyId) { - return FAMILY_KEY_PREFIX + KEY_SEPARATOR + familyId + KEY_SEPARATOR + "info"; - } - - public String generateFamilyRemainingKey(Long familyId) { - return FAMILY_KEY_PREFIX + KEY_SEPARATOR + familyId + KEY_SEPARATOR + "remaining"; - } - - public String generateFamilyCustomerMonthlyUsageKey(Long familyId, Long customerId) { - return FAMILY_KEY_PREFIX - + KEY_SEPARATOR - + familyId - + KEY_SEPARATOR - + "customer" - + KEY_SEPARATOR - + customerId - + KEY_SEPARATOR - + "usage" - + KEY_SEPARATOR - + "monthly"; - } - - public String generateFamilyKey(Long familyId) { - return FAMILY_KEY_PREFIX + KEY_SEPARATOR + familyId; - } -} diff --git a/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java b/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java new file mode 100644 index 0000000..9bea9c9 --- /dev/null +++ b/src/main/java/com/project/domain/customer/repository/CustomerQuotaRepository.java @@ -0,0 +1,13 @@ +package com.project.domain.customer.repository; + +import java.util.Collection; +import java.util.List; + +import org.springframework.data.jpa.repository.JpaRepository; + +import com.project.domain.customer.entity.CustomerQuota; + +public interface CustomerQuotaRepository extends JpaRepository { + + List findByFamilyIdIn(Collection familyIds); +} diff --git a/src/main/java/com/project/domain/family/infra/cache/FamilyCacheRepository.java b/src/main/java/com/project/domain/family/infra/cache/FamilyCacheRepository.java deleted file mode 100644 index 1629fc7..0000000 --- a/src/main/java/com/project/domain/family/infra/cache/FamilyCacheRepository.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.project.domain.family.infra.cache; - -import java.util.Optional; - -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Repository; - -import com.project.common.util.RedisKeyGenerator; -import com.project.domain.family.entity.Family; -import com.project.domain.family.infra.cache.dto.FamilyCacheDto; - -import lombok.RequiredArgsConstructor; - -@Repository -@RequiredArgsConstructor -public class FamilyCacheRepository { - - private final RedisTemplate familyStringRedisTemplate; - private final RedisTemplate familyCacheRedisTemplate; - private final RedisKeyGenerator redisKeyGenerator; - - public void save(Family family) { - String key = redisKeyGenerator.generateFamilyInfoKey(family.getId()); - familyCacheRedisTemplate.opsForValue().set(key, FamilyCacheDto.from(family)); - } - - public Optional findById(Long familyId) { - String key = redisKeyGenerator.generateFamilyInfoKey(familyId); - FamilyCacheDto dto = familyCacheRedisTemplate.opsForValue().get(key); - - if (dto == null) { - return Optional.empty(); - } - - return Optional.of(dto.toEntity()); - } - - public void evict(Long familyId) { - String key = redisKeyGenerator.generateFamilyInfoKey(familyId); - familyCacheRedisTemplate.delete(key); - } - - public Optional findFamilyRemainingBytes(Long familyId) { - String key = redisKeyGenerator.generateFamilyRemainingKey(familyId); - return findLongValue(key); - } - - public Optional findCustomerMonthlyUsageBytes(Long familyId, Long customerId) { - String key = redisKeyGenerator.generateFamilyCustomerMonthlyUsageKey(familyId, customerId); - return findLongValue(key); - } - - private Optional findLongValue(String key) { - String raw = familyStringRedisTemplate.opsForValue().get(key); - - if (raw == null || raw.isBlank()) { - return Optional.empty(); - } - - try { - return Optional.of(Long.parseLong(raw)); - } catch (NumberFormatException e) { - return Optional.empty(); - } - } -} diff --git a/src/main/java/com/project/domain/family/infra/cache/dto/FamilyCacheDto.java b/src/main/java/com/project/domain/family/infra/cache/dto/FamilyCacheDto.java deleted file mode 100644 index 735e2dc..0000000 --- a/src/main/java/com/project/domain/family/infra/cache/dto/FamilyCacheDto.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.project.domain.family.infra.cache.dto; - -import java.time.LocalDate; - -import com.project.domain.family.entity.Family; - -public record FamilyCacheDto( - Long id, - String name, - Long createdById, - Long totalQuotaBytes, - Long usedBytes, - LocalDate currentMonth) { - - public static FamilyCacheDto from(Family family) { - return new FamilyCacheDto( - family.getId(), - family.getName(), - family.getCreatedById(), - family.getTotalQuotaBytes(), - family.getUsedBytes(), - family.getCurrentMonth()); - } - - public Family toEntity() { - return Family.builder() - .id(id) - .name(name) - .createdById(createdById) - .totalQuotaBytes(totalQuotaBytes) - .usedBytes(usedBytes) - .currentMonth(currentMonth) - .build(); - } -} diff --git a/src/main/java/com/project/domain/usagerecord/infra/messaging/UsageRecordKafkaConsumer.java b/src/main/java/com/project/domain/usagerecord/infra/messaging/UsageRecordKafkaConsumer.java deleted file mode 100644 index 45d616d..0000000 --- a/src/main/java/com/project/domain/usagerecord/infra/messaging/UsageRecordKafkaConsumer.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.project.domain.usagerecord.infra.messaging; - -import java.time.LocalDateTime; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.stereotype.Component; - -import com.dabom.messaging.kafka.contract.KafkaEventTypes; -import com.dabom.messaging.kafka.contract.KafkaTopics; -import com.dabom.messaging.kafka.event.KafkaEventMessageSupport; -import com.dabom.messaging.kafka.event.dto.EventEnvelope; -import com.dabom.messaging.kafka.event.dto.usage.UsageRealtimePayload; -import com.fasterxml.jackson.core.type.TypeReference; -import com.project.domain.usagerecord.infra.messaging.config.UsageRealtimeBroadcastKafkaConfig; -import com.project.domain.usagerecord.infra.sse.SsePublisher; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -@Component -@RequiredArgsConstructor -public class UsageRecordKafkaConsumer { - - private final KafkaEventMessageSupport kafkaEventMessageSupport; - private final SsePublisher ssePublisher; - - @KafkaListener( - topics = KafkaTopics.USAGE_REALTIME, - containerFactory = - UsageRealtimeBroadcastKafkaConfig - .USAGE_REALTIME_BROADCAST_KAFKA_LISTENER_CONTAINER_FACTORY) - public void consume(ConsumerRecord record) { - kafkaEventMessageSupport.consumeByEventType( - record, - KafkaEventTypes.USAGE_REALTIME, - new TypeReference>() {}, - (envelope, key) -> { - UsageRealtimePayload payload = envelope.payload(); - LocalDateTime publishTime = envelope.timestamp(); - - log.info( - "FamilyId:{}, totalUsedBytes:{}", - payload.familyId(), - payload.totalUsedBytes()); - - ssePublisher.pushMemberUsageBytes(payload, publishTime); - ssePublisher.pushTotalUsageBytes(payload, publishTime); - }); - } -} diff --git a/src/main/java/com/project/domain/usagerecord/infra/messaging/config/UsageRealtimeBroadcastKafkaConfig.java b/src/main/java/com/project/domain/usagerecord/infra/messaging/config/UsageRealtimeBroadcastKafkaConfig.java deleted file mode 100644 index 265caba..0000000 --- a/src/main/java/com/project/domain/usagerecord/infra/messaging/config/UsageRealtimeBroadcastKafkaConfig.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.project.domain.usagerecord.infra.messaging.config; - -import java.util.UUID; - -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.listener.CommonErrorHandler; - -import com.dabom.messaging.kafka.autoconfigure.KafkaConfig; -import com.dabom.messaging.kafka.autoconfigure.KafkaErrorHandlerConfig; -import com.dabom.messaging.kafka.error.KafkaExceptionClassifier; -import com.dabom.messaging.kafka.event.KafkaEventMessageSupport; -import com.dabom.messaging.kafka.event.publisher.DefaultKafkaEventPublisher; -import com.dabom.messaging.kafka.metrics.KafkaMetrics; -import com.dabom.messaging.kafka.metrics.consumer.KafkaMetricsRecordInterceptor; -import com.dabom.messaging.kafka.metrics.producer.KafkaMetricsProducerListener; -import com.dabom.messaging.kafka.support.KafkaEventMetadataExtractor; -import com.dabom.messaging.kafka.support.KafkaLogSanitizer; - -@Configuration -@Import({ - KafkaConfig.class, - KafkaErrorHandlerConfig.class, - KafkaMetrics.class, - KafkaExceptionClassifier.class, - KafkaEventMetadataExtractor.class, - KafkaLogSanitizer.class, - KafkaEventMessageSupport.class, - KafkaMetricsRecordInterceptor.class, - KafkaMetricsProducerListener.class, - DefaultKafkaEventPublisher.class -}) -public class UsageRealtimeBroadcastKafkaConfig { - - public static final String USAGE_REALTIME_BROADCAST_KAFKA_LISTENER_CONTAINER_FACTORY = - "usageRealtimeBroadcastKafkaListenerContainerFactory"; - - private static final int GROUP_ID_SUFFIX_LENGTH = 8; - - @Bean(USAGE_REALTIME_BROADCAST_KAFKA_LISTENER_CONTAINER_FACTORY) - public ConcurrentKafkaListenerContainerFactory - usageRealtimeBroadcastKafkaListenerContainerFactory( - ConsumerFactory consumerFactory, - CommonErrorHandler kafkaCommonErrorHandler, - KafkaMetricsRecordInterceptor kafkaMetricsRecordInterceptor, - @Value("${spring.kafka.broadcast.group-id-prefix:usage-realtime}") - String broadcastGroupIdPrefix) { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory); - factory.getContainerProperties() - .setGroupId( - broadcastGroupIdPrefix - + "-" - + UUID.randomUUID() - .toString() - .substring(0, GROUP_ID_SUFFIX_LENGTH)); - factory.getContainerProperties().setObservationEnabled(true); - factory.setRecordInterceptor(kafkaMetricsRecordInterceptor); - factory.setCommonErrorHandler(kafkaCommonErrorHandler); - return factory; - } -} diff --git a/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java b/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java index bcadb7e..fb7933d 100644 --- a/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java +++ b/src/main/java/com/project/domain/usagerecord/infra/sse/PollingService.java @@ -1,64 +1,97 @@ package com.project.domain.usagerecord.infra.sse; -import java.time.LocalDateTime; -import java.util.Optional; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Collectors; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; -import com.dabom.messaging.kafka.event.dto.usage.UsageRealtimePayload; -import com.project.domain.family.infra.cache.FamilyCacheRepository; +import com.project.domain.customer.entity.CustomerQuota; +import com.project.domain.customer.repository.CustomerQuotaRepository; +import com.project.domain.family.entity.Family; +import com.project.domain.family.repository.FamilyRepository; +import com.project.domain.usagerecord.dto.response.RealtimeTotalUsageResponse; +import com.project.domain.usagerecord.dto.response.RealtimeUsageByMemberResponse; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +@Slf4j @Service @RequiredArgsConstructor public class PollingService { + private static final long POLLING_DELAY_MS = 1_000L; private static final long HEARTBEAT_DELAY_MS = 25_000L; + private static final String EVENT_USAGE_UPDATED = "usage-updated"; + private static final String EVENT_USAGE_UPDATED_BY_MEMBER = "usage-updated-by-member"; private final EmitterRegistry emitterRegistry; - private final FamilyCacheRepository familyCacheRepository; - private final ConcurrentHashMap lastSeen = new ConcurrentHashMap<>(); - private final SsePublisher ssePublisher; + private final FamilyRepository familyRepository; + private final CustomerQuotaRepository customerQuotaRepository; - // 1) 활성 familyId를 순회하며 최신 잔여 용량을 조회합니다. - // 2) 이전 값과 다를 때만 페이로드를 생성해 SSE로 전송합니다. - @Scheduled(fixedDelay = 1000) + private final ConcurrentHashMap lastSeenUsedBytes = new ConcurrentHashMap<>(); + + // 1) 활성 familyId 전체를 일괄 조회하여 N+1 문제를 방지합니다. + // 2) 이전 값과 다른 family만 필터링하여 SSE로 전송합니다. + @Transactional(readOnly = true) + @Scheduled(fixedDelay = POLLING_DELAY_MS) public void pollAndPushIfChanged() { - for (Long familyId : emitterRegistry.activeFamilyIds()) { - Optional latestOpt = familyCacheRepository.findFamilyRemainingBytes(familyId); - if (latestOpt.isEmpty()) { - lastSeen.remove(familyId); + Set activeFamilyIds = emitterRegistry.activeFamilyIds(); + if (activeFamilyIds.isEmpty()) { + return; + } + + Map familiesById = + familyRepository.findAllById(activeFamilyIds).stream() + .collect(Collectors.toMap(Family::getId, Function.identity())); + + List changedFamilyIds = new ArrayList<>(); + for (Long familyId : activeFamilyIds) { + Family family = familiesById.get(familyId); + if (family == null) { + lastSeenUsedBytes.remove(familyId); continue; } - long remainingBytes = latestOpt.get(); - Long prev = lastSeen.putIfAbsent(familyId, remainingBytes); - - if (prev == null || prev.longValue() != remainingBytes) { - lastSeen.put(familyId, remainingBytes); - - // 임시로 고정 - long totalLimitBytes = 20000; - long totalUsedBytes = totalLimitBytes - remainingBytes; - - UsageRealtimePayload payload = - new UsageRealtimePayload( - familyId, - 4L, - totalUsedBytes, - totalLimitBytes, - remainingBytes, - 30.0, - null, - null, - null); - - LocalDateTime now = LocalDateTime.now(); - ssePublisher.pushTotalUsageBytes(payload, now); - ssePublisher.pushMemberUsageBytes(payload, now); + long usedBytes = family.getUsedBytes(); + Long prev = lastSeenUsedBytes.get(familyId); + + if (prev == null || prev.longValue() != usedBytes) { + lastSeenUsedBytes.put(familyId, usedBytes); + changedFamilyIds.add(familyId); + + long totalQuotaBytes = family.getTotalQuotaBytes(); + long remainingBytes = totalQuotaBytes - usedBytes; + + RealtimeTotalUsageResponse totalResponse = + new RealtimeTotalUsageResponse( + familyId, usedBytes, totalQuotaBytes, remainingBytes); + emitterRegistry.send(familyId, EVENT_USAGE_UPDATED, totalResponse); + } + } + + if (changedFamilyIds.isEmpty()) { + return; + } + + Map> quotasByFamilyId = + customerQuotaRepository.findByFamilyIdIn(changedFamilyIds).stream() + .collect(Collectors.groupingBy(CustomerQuota::getFamilyId)); + + for (Long familyId : changedFamilyIds) { + List quotas = quotasByFamilyId.getOrDefault(familyId, List.of()); + for (CustomerQuota quota : quotas) { + RealtimeUsageByMemberResponse memberResponse = + new RealtimeUsageByMemberResponse( + familyId, quota.getCustomerId(), quota.getMonthlyUsedBytes()); + emitterRegistry.send(familyId, EVENT_USAGE_UPDATED_BY_MEMBER, memberResponse); } } } diff --git a/src/main/java/com/project/domain/usagerecord/infra/sse/SsePublisher.java b/src/main/java/com/project/domain/usagerecord/infra/sse/SsePublisher.java index 2b0d9ae..43aa2b4 100644 --- a/src/main/java/com/project/domain/usagerecord/infra/sse/SsePublisher.java +++ b/src/main/java/com/project/domain/usagerecord/infra/sse/SsePublisher.java @@ -1,114 +1,16 @@ package com.project.domain.usagerecord.infra.sse; -import java.time.LocalDateTime; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; - -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; -import com.dabom.messaging.kafka.event.dto.usage.UsageRealtimePayload; -import com.project.domain.usagerecord.dto.response.RealtimeTotalUsageResponse; -import com.project.domain.usagerecord.dto.response.RealtimeUsageByMemberResponse; - import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; @Service @RequiredArgsConstructor -@Slf4j public class SsePublisher { private final EmitterRegistry emitterRegistry; - private final ConcurrentHashMap> - lastTotalBytesTimeByFamily = new ConcurrentHashMap<>(); - - private final ConcurrentHashMap> - lastTotalMemberBytesTimeByFamily = new ConcurrentHashMap<>(); - - // 1) 가족별 최신 발행 시각보다 오래된 이벤트는 폐기합니다. - // 2) 최신 이벤트만 응답 DTO로 변환해 SSE로 전송합니다. - @Async - public void pushTotalUsageBytes(UsageRealtimePayload payload, LocalDateTime publishedDateTime) { - log.info("pushTotalUsageBytes thread : {}", Thread.currentThread().getName()); - Long familyId = payload.familyId(); - AtomicReference lastRef = totalTsRef(familyId); - - while (true) { - LocalDateTime current = lastRef.get(); - - if (!publishedDateTime.isAfter(current)) { - log.info( - "drop older total event: familyId={}, incoming={}, last={}", - familyId, - publishedDateTime, - current); - return; - } - - if (lastRef.compareAndSet(current, publishedDateTime)) { - break; - } - } - - RealtimeTotalUsageResponse response = - new RealtimeTotalUsageResponse( - payload.familyId(), - payload.totalUsedBytes(), - payload.totalLimitBytes(), - payload.remainingBytes()); - - emitterRegistry.send(familyId, "usage-updated", response); - } - - // 1) 가족별 멤버 사용량 이벤트의 최신 시각을 CAS로 보장합니다. - // 2) 최신 이벤트만 멤버 단위 응답으로 변환해 SSE로 전송합니다. - @Async - public void pushMemberUsageBytes( - UsageRealtimePayload payload, LocalDateTime publishedDateTime) { - log.info("pushMemberUsageBytes thread : {}", Thread.currentThread().getName()); - - Long familyId = payload.familyId(); - Long customerId = payload.customerId(); - AtomicReference lastRef = memberTsRef(familyId); - - while (true) { - LocalDateTime current = lastRef.get(); - - if (!publishedDateTime.isAfter(current)) { - log.info( - "drop older member event: familyId={}, customerId={}, incoming={}, last={}", - familyId, - customerId, - publishedDateTime, - current); - return; - } - - if (lastRef.compareAndSet(current, publishedDateTime)) { - break; - } - } - - RealtimeUsageByMemberResponse response = - new RealtimeUsageByMemberResponse(familyId, customerId, payload.monthlyUsedBytes()); - emitterRegistry.send(familyId, "usage-updated-by-member", response); - } - public void pushNotificationEvent(Long familyId, String eventType, Object payload) { emitterRegistry.send(familyId, eventType, payload); } - - // 가족별 총 사용량 최신 시각 저장소를 초기화하거나 반환합니다. - private AtomicReference totalTsRef(Long familyId) { - return lastTotalBytesTimeByFamily.computeIfAbsent( - familyId, id -> new AtomicReference<>(LocalDateTime.MIN)); - } - - // 가족별 멤버 사용량 최신 시각 저장소를 초기화하거나 반환합니다. - private AtomicReference memberTsRef(Long familyId) { - return lastTotalMemberBytesTimeByFamily.computeIfAbsent( - familyId, id -> new AtomicReference<>(LocalDateTime.MIN)); - } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 3af9dc8..e16511c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -40,8 +40,6 @@ spring: consumer: group-id: ${KAFKA_CONSUMER_GROUP_ID:dabom-api-notification} auto-offset-reset: ${KAFKA_AUTO_OFFSET_RESET:earliest} - broadcast: - group-id-prefix: ${KAFKA_BROADCAST_GROUP_PREFIX:usage-realtime} producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer diff --git a/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java b/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java index a423651..c4d2c1c 100644 --- a/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java +++ b/src/test/java/com/project/domain/usagerecord/infra/sse/PollingServiceTest.java @@ -1,6 +1,13 @@ package com.project.domain.usagerecord.infra.sse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Set; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -9,16 +16,21 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import com.project.domain.family.infra.cache.FamilyCacheRepository; +import com.project.domain.customer.entity.CustomerQuota; +import com.project.domain.customer.repository.CustomerQuotaRepository; +import com.project.domain.family.entity.Family; +import com.project.domain.family.repository.FamilyRepository; +import com.project.domain.usagerecord.dto.response.RealtimeTotalUsageResponse; +import com.project.domain.usagerecord.dto.response.RealtimeUsageByMemberResponse; @ExtendWith(MockitoExtension.class) class PollingServiceTest { @Mock private EmitterRegistry emitterRegistry; - @Mock private FamilyCacheRepository familyCacheRepository; + @Mock private FamilyRepository familyRepository; - @Mock private SsePublisher ssePublisher; + @Mock private CustomerQuotaRepository customerQuotaRepository; @InjectMocks private PollingService pollingService; @@ -31,4 +43,91 @@ void sendHeartbeat_calls_EmitterRegistry() { // then verify(emitterRegistry).sendHeartbeat(); } + + @Test + @DisplayName("사용량이 변경되면 usage-updated와 usage-updated-by-member 이벤트를 전송합니다.") + void pollAndPushIfChanged_sendsEvents_whenUsageChanged() { + // given + Long familyId = 1L; + Family family = + Family.builder() + .id(familyId) + .name("test") + .createdById(1L) + .totalQuotaBytes(10000L) + .usedBytes(3000L) + .build(); + CustomerQuota quota = + CustomerQuota.builder() + .customerId(10L) + .familyId(familyId) + .monthlyUsedBytes(3000L) + .build(); + + when(emitterRegistry.activeFamilyIds()).thenReturn(Set.of(familyId)); + when(familyRepository.findAllById(Set.of(familyId))).thenReturn(List.of(family)); + when(customerQuotaRepository.findByFamilyIdIn(List.of(familyId))) + .thenReturn(List.of(quota)); + + // when + pollingService.pollAndPushIfChanged(); + + // then + verify(emitterRegistry) + .send( + familyId, + "usage-updated", + new RealtimeTotalUsageResponse(familyId, 3000L, 10000L, 7000L)); + verify(emitterRegistry) + .send( + familyId, + "usage-updated-by-member", + new RealtimeUsageByMemberResponse(familyId, 10L, 3000L)); + } + + @Test + @DisplayName("사용량 변경이 없으면 이벤트를 전송하지 않습니다.") + void pollAndPushIfChanged_doesNotSendEvents_whenUsageUnchanged() { + // given + Long familyId = 1L; + Family family = + Family.builder() + .id(familyId) + .name("test") + .createdById(1L) + .totalQuotaBytes(10000L) + .usedBytes(3000L) + .build(); + + when(emitterRegistry.activeFamilyIds()).thenReturn(Set.of(familyId)); + when(familyRepository.findAllById(Set.of(familyId))).thenReturn(List.of(family)); + when(customerQuotaRepository.findByFamilyIdIn(List.of(familyId))).thenReturn(List.of()); + + // 첫 호출: 초기값 설정 + pollingService.pollAndPushIfChanged(); + + // when: 두 번째 호출 (값 동일) + pollingService.pollAndPushIfChanged(); + + // then: usage-updated는 첫 호출에서 1번만 발생 + verify(emitterRegistry) + .send(eq(familyId), eq("usage-updated"), any(RealtimeTotalUsageResponse.class)); + } + + @Test + @DisplayName("familyId에 해당하는 Family가 없으면 이벤트를 전송하지 않습니다.") + void pollAndPushIfChanged_doesNotSendEvents_whenFamilyNotFound() { + // given + Long familyId = 999L; + + when(emitterRegistry.activeFamilyIds()).thenReturn(Set.of(familyId)); + when(familyRepository.findAllById(Set.of(familyId))).thenReturn(List.of()); + + // when + pollingService.pollAndPushIfChanged(); + + // then + verify(emitterRegistry, never()).send(any(), eq("usage-updated"), any()); + verify(emitterRegistry, never()).send(any(), eq("usage-updated-by-member"), any()); + } }