Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reports/REPORT-embedding-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
1. `BRPOP` 으로 `EMBEDDING_QUEUE_KEY` 대기.
2. 플래그 기준으로 DB에서 게시글을 조회(`findPostById`)하고, 제목(`storeTitleEmbedding`)과 본문(`chunkText` → `createEmbeddings` → `storeContentEmbeddings`)을 필요 시 처리.
3. 오류 시 재시도: `attempt` 증가, `EMBEDDING_WORKER_MAX_RETRIES`, `EMBEDDING_WORKER_BACKOFF_MS` 기반 backoff, 한계를 넘으면 `EMBEDDING_FAILED_QUEUE_KEY` 로 이동.
- 기타: Graceful shutdown(SIGINT/SIGTERM), DebugLogger 로 주요 이벤트 기록.
- 기타: Graceful shutdown(SIGINT/SIGTERM), 콘솔 로그로 주요 이벤트 기록.

## 3. 환경 변수 (추가 항목)
| 키 | 용도 | 기본값 |
Expand Down
37 changes: 18 additions & 19 deletions src/worker/queue-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
storeTitleEmbedding,
} from '../services/embedding.service';
import { findPostById } from '../repositories/post.repository';
import { DebugLogger } from '../utils/debug-logger';

type EmbeddingJob = {
postId: number;
Expand Down Expand Up @@ -37,7 +36,7 @@ let shuttingDown = false;
const handleShutdown = (signal: NodeJS.Signals) => {
if (shuttingDown) return;
shuttingDown = true;
DebugLogger.warn('server', { type: 'worker.shutdown', signal });
console.warn('[embedding-worker]', { type: 'worker.shutdown', signal });
try {
redis.disconnect();
} catch {
Expand Down Expand Up @@ -68,11 +67,11 @@ const processJob = async (job: EmbeddingJob) => {
const shouldProcessTitle = parseFlag(job.title);
const shouldProcessContent = parseFlag(job.content);

if (!shouldProcessTitle && !shouldProcessContent) {
DebugLogger.warn('server', {
type: 'worker.job.skipped',
postId,
reason: 'no_targets',
if (!shouldProcessTitle && !shouldProcessContent) {
console.warn('[embedding-worker]', {
type: 'worker.job.skipped',
postId,
reason: 'no_targets',
});
return;
}
Expand All @@ -88,7 +87,7 @@ const processJob = async (job: EmbeddingJob) => {

if (shouldProcessTitle) {
if (!title) {
DebugLogger.warn('server', {
console.warn('[embedding-worker]', {
type: 'worker.job.skipped',
postId,
reason: 'empty_title',
Expand All @@ -101,7 +100,7 @@ const processJob = async (job: EmbeddingJob) => {

if (shouldProcessContent) {
if (!content) {
DebugLogger.warn('server', {
console.warn('[embedding-worker]', {
type: 'worker.job.skipped',
postId,
reason: 'empty_content',
Expand All @@ -112,7 +111,7 @@ const processJob = async (job: EmbeddingJob) => {
const chunks = chunkText(content);

if (!chunks.length) {
DebugLogger.warn('server', {
console.warn('[embedding-worker]', {
type: 'worker.job.skipped',
postId,
reason: 'no_chunks',
Expand All @@ -138,7 +137,7 @@ const pushToFailedQueue = async (payload: unknown) => {
})
);
} catch (error) {
DebugLogger.error('server', {
console.error('[embedding-worker]', {
type: 'worker.failed_queue_error',
message: (error as Error)?.message ?? 'unknown',
});
Expand All @@ -150,7 +149,7 @@ const handlePayload = async (rawPayload: string) => {
try {
job = JSON.parse(rawPayload) as EmbeddingJob;
} catch (error) {
DebugLogger.error('server', {
console.error('[embedding-worker]', {
type: 'worker.job.invalid_json',
error: (error as Error)?.message ?? 'invalid_json',
});
Expand All @@ -159,15 +158,15 @@ const handlePayload = async (rawPayload: string) => {
}

const attempt = Number(job.attempt || 0);
DebugLogger.log('server', {
console.log('[embedding-worker]', {
type: 'worker.job.start',
postId: job.postId,
attempt,
});

try {
await processJob(job);
DebugLogger.log('server', {
console.log('[embedding-worker]', {
type: 'worker.job.success',
postId: job.postId,
});
Expand All @@ -182,7 +181,7 @@ const handlePayload = async (rawPayload: string) => {
};

if (nextAttempt < maxRetries) {
DebugLogger.warn('server', {
console.warn('[embedding-worker]', {
type: 'worker.job.retry',
postId: job.postId,
attempt: nextAttempt,
Expand All @@ -193,7 +192,7 @@ const handlePayload = async (rawPayload: string) => {
}
await redis.lpush(queueKey, JSON.stringify(enrichedPayload));
} else {
DebugLogger.error('server', {
console.error('[embedding-worker]', {
type: 'worker.job.failed',
postId: job.postId,
attempt: nextAttempt,
Expand All @@ -205,7 +204,7 @@ const handlePayload = async (rawPayload: string) => {
};

const run = async () => {
DebugLogger.info('server', {
console.info('[embedding-worker]', {
type: 'worker.start',
queueKey,
failedQueueKey,
Expand All @@ -220,7 +219,7 @@ const run = async () => {
await handlePayload(payload);
} catch (error) {
if (shuttingDown) break;
DebugLogger.error('server', {
console.error('[embedding-worker]', {
type: 'worker.loop_error',
message: (error as Error)?.message ?? 'unknown',
});
Expand All @@ -230,7 +229,7 @@ const run = async () => {
}
}

DebugLogger.info('server', { type: 'worker.exit' });
console.info('[embedding-worker]', { type: 'worker.exit' });
};

run().catch((error) => {
Expand Down