|
1 | | - # 질문 → 답변 API (API endpoint) |
2 | | -# 질문을 DB에 채움 |
| 1 | +# 강제 수동 실행용 (Change Stream 대신) |
| 2 | +# API 트리거 질문 -> 응답 처리 가능 |
| 3 | + |
| 4 | +""" |
| 5 | +1. (Change Stream 또는 직접 호출 시) qa_queries 문서 전체 읽기 |
| 6 | +2. 각 질문(request_id) 확인: |
| 7 | + queries_embedding에 없으면 → 새 질문 처리 시작 |
| 8 | +3. queries_embedding 생성 (label, polarity 포함) |
| 9 | +4. qa_answers에 같은 store_id + menu_id + label + polarity 답변이 있으면: |
| 10 | + qa_answers.created_at < reviews_denorm.updated_at → 새로 생성 |
| 11 | + 아니면 재사용 |
| 12 | +5. 최종 답변 qa_answers에 저장 |
| 13 | +""" |
| 14 | + |
3 | 15 | from fastapi import APIRouter |
4 | | -from app.services.rag_service import run_rag |
| 16 | +from datetime import datetime |
| 17 | +from app.db.mongodb import get_collection |
| 18 | +from app.services.embedding_service import embed_and_label_question |
| 19 | +from app.services.rag_service import generate_answer_from_reviews |
5 | 20 |
|
6 | 21 | router = APIRouter() |
7 | 22 |
|
8 | | -@router.post("/ask/{request_id}") |
9 | | -async def ask_question(request_id: str): |
10 | | - return await run_rag(request_id) |
| 23 | +qa_queries_col = get_collection("qa_queries") |
| 24 | +queries_embedding_col = get_collection("queries_embedding") |
| 25 | + |
| 26 | +@router.get("/process-queries") |
| 27 | +async def process_queries(limit: int = 10): |
| 28 | + """ |
| 29 | + 수동으로 QA 파이프라인 실행 (Change Stream 대신 직접 확인할 때 사용) |
| 30 | + """ |
| 31 | + results = [] |
| 32 | + docs = qa_queries_col.find().limit(limit) |
| 33 | + |
| 34 | + for doc in docs: |
| 35 | + store_id = doc["_id"] |
| 36 | + store_name = doc["store_name"] |
| 37 | + |
| 38 | + for menu in doc.get("menus", []): |
| 39 | + menu_id = menu["menu_id"] |
| 40 | + menu_name = menu["menu_name"] |
| 41 | + |
| 42 | + for q in menu.get("questions", []): |
| 43 | + request_id = q["request_id"] |
| 44 | + question = q["question"] |
| 45 | + |
| 46 | + # 이미 처리된 질문이면 skip |
| 47 | + queries_doc = queries_embedding_col.find_one({"_id": store_id}) |
| 48 | + existing_ids = [] |
| 49 | + if queries_doc: |
| 50 | + for m in queries_doc["menus"]: |
| 51 | + if m["menu_id"] == menu_id: |
| 52 | + existing_ids = [qe["request_id"] for qe in m.get("questions_embedding", [])] |
| 53 | + if request_id in existing_ids: |
| 54 | + continue |
| 55 | + |
| 56 | + # 질문 라벨링 + 임베딩 |
| 57 | + label, polarity, embedding = embed_and_label_question(question) |
| 58 | + |
| 59 | + queries_embedding_col.update_one( |
| 60 | + {"_id": store_id, "menus.menu_id": menu_id}, |
| 61 | + { |
| 62 | + "$push": { |
| 63 | + "menus.$.questions_embedding": { |
| 64 | + "request_id": request_id, |
| 65 | + "question": question, |
| 66 | + "label": label, |
| 67 | + "polarity": polarity, |
| 68 | + "embedding": embedding, |
| 69 | + "created_at": datetime.utcnow() |
| 70 | + } |
| 71 | + }, |
| 72 | + "$set": {"updated_at": datetime.utcnow(), "store_name": store_name} |
| 73 | + }, |
| 74 | + upsert=True |
| 75 | + ) |
| 76 | + |
| 77 | + # RAG 실행 (라벨 맞는 리뷰 통계로 응답 생성) |
| 78 | + answer_result = generate_answer_from_reviews(store_id, menu_id, question) |
| 79 | + |
| 80 | + results.append({ |
| 81 | + "request_id": request_id, |
| 82 | + "answer": answer_result.get("answer"), |
| 83 | + "reviews_used": answer_result.get("reviews_used", []) |
| 84 | + }) |
| 85 | + |
| 86 | + return {"processed": len(results), "results": results} |
0 commit comments