Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/reports/REPORT-embedding-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
## 2. 워커 구조
- 파일: `src/worker/queue-consumer.ts`
- Redis 연결: `REDIS_URL`(우선) 또는 `REDIS_HOST`/`REDIS_PORT`.
- 작업 형식: `{ postId, title?, content?, attempt? }`.
- 작업 형식: `{ postId, title, content, attempt? }` (`title`/`content`는 boolean 플래그).
- 처리 순서
1. `BRPOP` 으로 `EMBEDDING_QUEUE_KEY` 대기.
2. 제목(`storeTitleEmbedding`)과 본문(`chunkText` → `createEmbeddings` → `storeContentEmbeddings`) 순차 처리.
2. 플래그 기준으로 DB에서 게시글을 조회(`findPostById`)하고, 제목(`storeTitleEmbedding`)과 본문(`chunkText` → `createEmbeddings` → `storeContentEmbeddings`)을 필요 시 처리.
3. 오류 시 재시도: `attempt` 증가, `EMBEDDING_WORKER_MAX_RETRIES`, `EMBEDDING_WORKER_BACKOFF_MS` 기반 backoff, 한계를 넘으면 `EMBEDDING_FAILED_QUEUE_KEY` 로 이동.
- 기타: Graceful shutdown(SIGINT/SIGTERM), DebugLogger 로 주요 이벤트 기록.

Expand Down Expand Up @@ -59,7 +59,7 @@ services:
- Secrets(예시): `REDIS_URL`, `REDIS_HOST`, `REDIS_PORT`, `EMBEDDING_QUEUE_KEY`, `EMBEDDING_FAILED_QUEUE_KEY`, `EMBEDDING_WORKER_MAX_RETRIES`, `EMBEDDING_WORKER_BACKOFF_MS` 등.

## 7. 운영 참고 사항
- Spring Boot 프로듀서는 LPUSH 로 작업을 큐에 적재(이미 구현됨).
- Spring Boot 프로듀서는 LPUSH 로 작업을 큐에 적재하며, `title`/`content` 변경 여부를 boolean 값으로 전달한다(이미 구현됨).
- Redis 는 외부 서버/매니지드 환경을 사용; 본 프로젝트 컨테이너에서는 Consumer 역할만 수행.
- 실패 큐(`embedding:failed`) 모니터링 및 재처리(예: RPOP → LPUSH → 재시도 스케줄러) 전략 필요.
- API 컨테이너에서 Redis 변수가 필요하지는 않지만, 비상시 커맨드 오버라이드를 대비해 공통으로 주입해 둔 상태.
Expand Down
65 changes: 53 additions & 12 deletions src/worker/queue-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import {
storeContentEmbeddings,
storeTitleEmbedding,
} from '../services/embedding.service';
import { findPostById } from '../repositories/post.repository';
import { DebugLogger } from '../utils/debug-logger';

type EmbeddingJob = {
postId: number;
title?: string | null;
content?: string | null;
title?: boolean | string | null;
content?: boolean | string | null;
attempt?: number;
metadata?: Record<string, unknown>;
};
Expand Down Expand Up @@ -54,23 +55,60 @@ const processJob = async (job: EmbeddingJob) => {
throw new Error('Invalid postId in embedding job');
}

const title = typeof job.title === 'string' ? job.title.trim() : '';
const content = typeof job.content === 'string' ? job.content.trim() : '';
const parseFlag = (value: boolean | string | null | undefined) => {
if (typeof value === 'boolean') return value;
if (typeof value === 'string') {
const lowered = value.trim().toLowerCase();
if (lowered === 'true') return true;
if (lowered === 'false') return false;
}
return false;
};

const shouldProcessTitle = parseFlag(job.title);
const shouldProcessContent = parseFlag(job.content);

if (!title && !content) {
DebugLogger.warn('server', {
type: 'worker.job.skipped',
postId,
reason: 'empty_payload',
if (!shouldProcessTitle && !shouldProcessContent) {
DebugLogger.warn('server', {
type: 'worker.job.skipped',
postId,
reason: 'no_targets',
});
return;
}

if (title) {
await storeTitleEmbedding(postId, title);
const post = await findPostById(postId);

if (!post) {
throw new Error(`Post ${postId} not found`);
}

if (content) {
const title = typeof post.title === 'string' ? post.title.trim() : '';
const content = typeof post.content === 'string' ? post.content.trim() : '';

if (shouldProcessTitle) {
if (!title) {
DebugLogger.warn('server', {
type: 'worker.job.skipped',
postId,
reason: 'empty_title',
});
} else {
await storeTitleEmbedding(postId, title);
console.log(`[embedding-worker] stored title embedding for post ${postId}`);
}
}

if (shouldProcessContent) {
if (!content) {
DebugLogger.warn('server', {
type: 'worker.job.skipped',
postId,
reason: 'empty_content',
});
return;
}

const chunks = chunkText(content);

if (!chunks.length) {
Expand All @@ -84,6 +122,9 @@ const processJob = async (job: EmbeddingJob) => {

const embeddings = await createEmbeddings(chunks);
await storeContentEmbeddings(postId, chunks, embeddings);
console.log(
`[embedding-worker] stored content embeddings for post ${postId} (chunks=${chunks.length})`
);
}
};

Expand Down