diff --git a/src/main/java/com/techfork/domain/post/repository/PostRepository.java b/src/main/java/com/techfork/domain/post/repository/PostRepository.java index e175897a..afeca712 100644 --- a/src/main/java/com/techfork/domain/post/repository/PostRepository.java +++ b/src/main/java/com/techfork/domain/post/repository/PostRepository.java @@ -13,10 +13,11 @@ import java.time.LocalDateTime; import java.util.List; import java.util.Optional; +import java.util.Set; public interface PostRepository extends JpaRepository { - - boolean existsByUrl(String url); + @Query("SELECT p.url FROM Post p WHERE p.url IN :urls") + Set findExistingUrls(@Param("urls") List urls); @Query(""" SELECT p FROM Post p diff --git a/src/main/java/com/techfork/domain/source/batch/RssFeedReader.java b/src/main/java/com/techfork/domain/source/batch/RssFeedReader.java index 9aaaa045..374bb78c 100644 --- a/src/main/java/com/techfork/domain/source/batch/RssFeedReader.java +++ b/src/main/java/com/techfork/domain/source/batch/RssFeedReader.java @@ -4,6 +4,7 @@ import com.rometools.rome.feed.synd.SyndFeed; import com.rometools.rome.io.SyndFeedInput; import com.rometools.rome.io.XmlReader; +import com.techfork.domain.post.repository.PostRepository; import com.techfork.domain.source.dto.RssFeedItem; import com.techfork.domain.source.entity.TechBlog; import com.techfork.domain.source.repository.TechBlogRepository; @@ -21,7 +22,8 @@ import java.time.ZoneId; import java.util.Date; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Set; +import java.util.stream.Stream; @Slf4j @Component @@ -30,59 +32,52 @@ public class RssFeedReader implements ItemReader { private final TechBlogRepository techBlogRepository; + private final PostRepository postRepository; private final WebClient webClient; - private ConcurrentLinkedQueue itemQueue; + private List items; + private int currentIndex = 0; @Override public RssFeedItem read() { - // 첫 실행 시 모든 RSS 아이템을 큐에 추가 - if (itemQueue == null) { - initializeQueue(); + if (items == null) { + initializeItems(); } - // 큐에서 아이템 꺼내기 (Thread-Safe) - RssFeedItem item = itemQueue.poll(); - - if (item == null) { - log.info("모든 RSS 피드 수집 완료"); + if (currentIndex >= items.size()) { + log.info("모든 RSS 피드 수집 완료: 총 {}개", items.size()); + return null; } - return item; + return items.get(currentIndex++); } - /** - * 모든 RSS 피드를 미리 수집하여 큐에 저장 - * 한 번만 실행되며, 여러 스레드가 큐에서 안전하게 아이템을 가져감 - */ - private synchronized void initializeQueue() { - // Double-checked locking - if (itemQueue != null) { - return; - } - - itemQueue = new ConcurrentLinkedQueue<>(); + private void initializeItems() { List techBlogs = techBlogRepository.findAll(); log.info("총 {}개 테크 블로그 RSS 수집 시작", techBlogs.size()); - int totalItems = 0; - for (TechBlog techBlog : techBlogs) { - try { - List items = fetchRssFeed(techBlog); - if (!items.isEmpty()) { - itemQueue.addAll(items); - totalItems += items.size(); - log.info("[{}] RSS 수집 성공: {}개 아이템", techBlog.getCompanyName(), items.size()); - } else { - log.warn("[{}] RSS 피드에 아이템이 없습니다", techBlog.getCompanyName()); - } - } catch (Exception e) { - log.error("[{}] RSS 수집 실패: {}", techBlog.getCompanyName(), e.getMessage(), e); - // 실패해도 다음 블로그 계속 처리 - } - } - - log.info("RSS 수집 초기화 완료: 총 {}개 아이템을 큐에 추가", totalItems); + List allItems = techBlogs.parallelStream() + .flatMap(techBlog -> { + try { + List feedItems = fetchRssFeed(techBlog); + log.info("[{}] RSS 수집 성공: {}개", techBlog.getCompanyName(), feedItems.size()); + return feedItems.stream(); + } catch (Exception e) { + log.error("[{}] RSS 수집 실패: {}", techBlog.getCompanyName(), e.getMessage()); + return Stream.empty(); + } + }) + .toList(); + + Set existingUrls = postRepository.findExistingUrls( + allItems.stream().map(RssFeedItem::url).toList() + ); + + items = allItems.stream() + .filter(item -> !existingUrls.contains(item.url())) + .toList(); + + log.info("RSS 수집 초기화 완료: 총 {}개 아이템", items.size()); } private List fetchRssFeed(TechBlog techBlog) throws Exception { @@ -165,4 +160,4 @@ private LocalDateTime convertToLocalDateTime(Date date) { .atZone(ZoneId.systemDefault()) .toLocalDateTime(); } -} +} \ No newline at end of file diff --git a/src/main/java/com/techfork/domain/source/batch/RssToPostProcessor.java b/src/main/java/com/techfork/domain/source/batch/RssToPostProcessor.java index 83ec98e1..9d716c7b 100644 --- a/src/main/java/com/techfork/domain/source/batch/RssToPostProcessor.java +++ b/src/main/java/com/techfork/domain/source/batch/RssToPostProcessor.java @@ -1,7 +1,6 @@ package com.techfork.domain.source.batch; import com.techfork.domain.post.entity.Post; -import com.techfork.domain.post.repository.PostRepository; import com.techfork.domain.source.dto.RssFeedItem; import com.techfork.domain.source.entity.TechBlog; import com.techfork.domain.source.repository.TechBlogRepository; @@ -13,7 +12,6 @@ /** * RssFeedItem을 Post 엔티티로 변환하는 Processor - * 중복 체크도 여기서 수행하여 이미 존재하는 URL은 null 반환 */ @Slf4j @Component @@ -21,21 +19,11 @@ @RequiredArgsConstructor public class RssToPostProcessor implements ItemProcessor { - private final PostRepository postRepository; private final TechBlogRepository techBlogRepository; @Override public Post process(RssFeedItem item) { - // 중복 체크 - if (postRepository.existsByUrl(item.url())) { - log.debug("중복 URL 스킵: {}", item.url()); - return null; // null 반환 시 Writer에서 처리 안 함 - } - - TechBlog techBlog = techBlogRepository.findById(item.techBlogId()) - .orElseThrow(() -> new IllegalStateException( - "TechBlog를 찾을 수 없습니다. ID: " + item.techBlogId())); - + TechBlog techBlog = techBlogRepository.getReferenceById(item.techBlogId()); return Post.create(item, techBlog); } } diff --git a/src/main/java/com/techfork/domain/source/config/RssCrawlingJobConfig.java b/src/main/java/com/techfork/domain/source/config/RssCrawlingJobConfig.java index 3c18b14d..10bc549f 100644 --- a/src/main/java/com/techfork/domain/source/config/RssCrawlingJobConfig.java +++ b/src/main/java/com/techfork/domain/source/config/RssCrawlingJobConfig.java @@ -74,8 +74,6 @@ public Step fetchAndSaveRssStep() { .reader(rssFeedReader) .processor(rssToPostProcessor) .writer(postBatchWriter) - // 병렬 처리: 5개 스레드로 동시에 RSS 수집 - .taskExecutor(rssTaskExecutor()) .faultTolerant() // 건너뛰기 정책: 최대 10개 아이템까지 건너뛰기 허용 .skipLimit(10) @@ -118,19 +116,6 @@ public Step embedAndIndexStep() { .build(); } - @Bean - public TaskExecutor rssTaskExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(5); - executor.setMaxPoolSize(10); - executor.setQueueCapacity(20); - executor.setThreadNamePrefix("rss-crawl-"); - executor.setWaitForTasksToCompleteOnShutdown(true); - executor.setAwaitTerminationSeconds(60); - executor.initialize(); - return executor; - } - @Bean public TaskExecutor embeddingTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); diff --git a/src/main/java/com/techfork/domain/source/scheduler/RssCrawlingScheduler.java b/src/main/java/com/techfork/domain/source/scheduler/RssCrawlingScheduler.java index 8385cf3b..06ae7917 100644 --- a/src/main/java/com/techfork/domain/source/scheduler/RssCrawlingScheduler.java +++ b/src/main/java/com/techfork/domain/source/scheduler/RssCrawlingScheduler.java @@ -14,7 +14,7 @@ /** * RSS 크롤링 스케줄러 - * - 1시간마다 RSS 피드 크롤링 실행 + * - 24시간마다 RSS 피드 크롤링 실행 * - Redis 분산 락으로 중복 실행 방지 */ @Slf4j @@ -31,7 +31,7 @@ public class RssCrawlingScheduler { /** * 매일 오전 5시마다 RSS 크롤링 실행 - * cron: 0 0 5 * * * -> 매 시간 정각 + * cron: 0 0 5 * * * -> 매일 오전 5시 */ @Scheduled(cron = "0 0 5 * * *") public void scheduleCrawling() { @@ -50,17 +50,11 @@ public void scheduleCrawling() { log.error("Unexpected error during scheduled crawling", e); } finally { distributedLock.unlock(CRAWLING_LOCK_KEY, lockValue); + cleanupStaleHistories(); } } - /** - * 5분마다 오래된 RUNNING 상태의 이력을 정리 (좀비 프로세스 방지) - * cron: 매 5분마다 실행 - */ - @Scheduled(cron = "0 */5 * * * *") - public void cleanupStaleHistories() { - log.debug("Checking for stale crawling histories"); - + private void cleanupStaleHistories() { var staleHistories = crawlingHistoryRepository.findByStatusAndStartedAtBefore( ECrawlingStatus.RUNNING, java.time.LocalDateTime.now().minusHours(1) ); diff --git a/src/main/java/com/techfork/global/config/WebClientConfig.java b/src/main/java/com/techfork/global/config/WebClientConfig.java index 6edcfae4..ad3d34e9 100644 --- a/src/main/java/com/techfork/global/config/WebClientConfig.java +++ b/src/main/java/com/techfork/global/config/WebClientConfig.java @@ -1,8 +1,6 @@ package com.techfork.global.config; -import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.channel.ChannelOption; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -10,7 +8,6 @@ import org.springframework.web.reactive.function.client.WebClient; import reactor.netty.http.client.HttpClient; -import javax.net.ssl.SSLException; import java.time.Duration; @@ -20,34 +17,20 @@ public class WebClientConfig { @Bean public WebClient webClient() { - try { - // SSL Context 생성 - 모든 인증서 신뢰 - SslContext sslContext = SslContextBuilder - .forClient() - .trustManager(InsecureTrustManagerFactory.INSTANCE) - .build(); - - // HttpClient 설정 (Netty 기반) - HttpClient httpClient = HttpClient.create() - .secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)) - .responseTimeout(Duration.ofSeconds(30)) - .followRedirect(true); // Redirect 자동 추적 - - // WebClient 생성 - WebClient webClient = WebClient.builder() - .clientConnector(new ReactorClientHttpConnector(httpClient)) - .defaultHeader("User-Agent", "Mozilla/5.0 (compatible; TechFork-Bot/1.0)") - .defaultHeader("Accept", "application/rss+xml, application/xml, application/atom+xml, text/xml, */*") - .codecs(configurer -> configurer - .defaultCodecs() - .maxInMemorySize(10 * 1024 * 1024)) - .build(); - - return webClient; - - } catch (SSLException e) { - log.error("WebClient 초기화 실패", e); - throw new RuntimeException("WebClient 초기화 실패", e); - } + // HttpClient 설정 (Netty 기반) + HttpClient httpClient = HttpClient.create() + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) // 연결 타임아웃 + .responseTimeout(Duration.ofSeconds(30)) // 응답 타임아웃 + .followRedirect(true); // Redirect 자동 추적 + + // WebClient 생성 + return WebClient.builder() + .clientConnector(new ReactorClientHttpConnector(httpClient)) + .defaultHeader("User-Agent", "Mozilla/5.0 (compatible; TechFork-Bot/1.0)") // 봇 차단 방지 + .defaultHeader("Accept", "application/rss+xml, application/xml, application/atom+xml, text/xml, */*") // RSS/XML 콘텐츠 명시 + .codecs(configurer -> configurer + .defaultCodecs() + .maxInMemorySize(10 * 1024 * 1024)) // 큰 RSS 피드 처리 가능 + .build(); } }