Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 117 additions & 163 deletions langchain_rag/utils/embedding.py
Original file line number Diff line number Diff line change
@@ -1,183 +1,137 @@
#!/usr/bin/env python3
# embedding.py
"""
3.
S3(또는 로컬) 전처리 텍스트 → 배치 임베딩 → HNSW 인덱스 추가 → JSONL 메타 기록
--start_entries와 --max_entries 옵션으로 처리 범위를 지정할 수 있습니다.
디버깅용으로 파일 처리 번호와 메모리 로그를 모두 출력합니다.
"""

import os
import json
import sys
from __future__ import annotations
import json, os, sys, argparse
from pathlib import Path
from typing import List, Tuple

from typing import Iterable, List

import faiss
import numpy as np
from dotenv import load_dotenv
from sklearn.preprocessing import normalize
from langchain_community.embeddings import HuggingFaceEmbeddings # KoSimCSE
# from sentence_transformers import SentenceTransformer #gte
import psutil

from langchain_community.embeddings import HuggingFaceEmbeddings
from local_storage import LocalStorage

# 프로젝트 루트 디렉토리를 Python 경로에 추가
CURRENT_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = CURRENT_DIR.parent.parent # utils의 상위 디렉토리의 상위 디렉토리
sys.path.append(str(PROJECT_ROOT))

# store_vector.py에서 필요한 함수들 import
from store_vector import create_hnsw_index, save_metadata

# 환경 변수 로드
load_dotenv()
HNSW_INDEX_PATH = Path(os.getenv("HNSW_INDEX_PATH", "data/embedding_data/hnsw.index"))
METADATA_PATH = HNSW_INDEX_PATH.with_suffix(".meta.jsonl")


# 기본 HNSW 인덱스 저장 경로 (환경변수로 재정의 가능)
DEFAULT_HNSW_PATH = Path(
os.getenv("HNSW_INDEX_PATH", "data/embedding_data/hnsw.index")
)
# 기본 메타데이터 저장 경로
DEFAULT_META_PATH = Path(
os.getenv("METADATA_SAVE_PATH", "data/embedding_data/faiss_metadata.json")
)
def log_mem(stage: str):
m = psutil.Process(os.getpid()).memory_info().rss / (1024**2)
print(f"[MEMORY] {stage}: {m:.2f} MiB")

class EmbedFromS3:
def __init__(
self,
folder_path: str = "pre_processed_data/", # S3 폴더 경로
model_name: str = "BM-K/KoSimCSE-roberta", # 임베딩 모델명
# model_name: str = "thenlper/gte-base",
batch_size: int = 64 # 배치 크기
folder_path: str,
model_name: str,
batch_size: int,
start_entries: int,
max_entries: int | None,
m: int,
ef_construction: int,
ef_search: int,
):
self.folder_path = folder_path
self.batch_size = batch_size
self.localstorage = LocalStorage()
self._model = HuggingFaceEmbeddings(model_name = model_name)
# self._model = SentenceTransformer(model_name)

# ──────────────────────────
# 임베딩된 내용 vector store에 저장
# ──────────────────────────
def _save_to_vector_store(self, texts: List[str], vectors: np.ndarray) -> None:
"""
벡터와 메타데이터를 vector_db에 저장
"""
create_hnsw_index(
texts=texts,
vectors=vectors
)

# ──────────────────────────
# raw 데이터 임베딩
# ──────────────────────────
def _embed_texts(self, texts: List[str]) -> np.ndarray:
"""
주어진 텍스트 리스트를 임베딩한 뒤 L2 정규화하여 반환
"""
embs = self._model.embed_documents(texts)
# embs = self._model.encode(texts, show_progress_bar=False) # gte (sentence_transformers)
return normalize(np.asarray(embs, dtype="float32"), norm="l2")

# ──────────────────────────
# 1. 단일 파일 임베딩
# ──────────────────────────
def embed_file(self, file_path: str) -> Tuple[List[str], np.ndarray]:
"""
1) 단일 JSON/TXT 파일에서 텍스트 추출
2) 임베딩 벡터 생성 + 정규화
3) HNSW 인덱스와 메타데이터 저장
"""
# 1) 텍스트 읽기
texts = (
self.localstorage.get_all_places_from_json(file_path)
if file_path.lower().endswith(".json")
else self.localstorage.get_all_places_from_txt(file_path)
)
# 2) 임베딩
vectors = self._embed_texts(texts)

# 3) HNSW 인덱스 저장
self._save_to_vector_store(texts, vectors)

return texts, vectors

# ──────────────────────────
# 2. 앞에서 k개 파일 임베딩
# ──────────────────────────
def embed_k_files(self, k: int) -> Tuple[List[str], np.ndarray]:
"""
1) S3에서 첫 k개 파일 가져오기
2) 순차 임베딩 + 정규화
3) HNSW 인덱스와 메타데이터 저장
"""
keys = self.localstorage.list_first_n_files(self.folder_path, k)
all_texts: List[str] = []
print(f"[DEBUG] embed_k_files 시작: 처리할 파일 수 = {len(keys)}")
for idx, key in enumerate(keys, start=1):
print(f"[DEBUG] ({idx}/{len(keys)}) 파일 읽는 중: {key}")
# 파일별 텍스트 추출
texts = self.localstorage.get_all_places_from_json(key)
print(f"[DEBUG] -> 추출된 텍스트 개수: {len(texts)}")
all_texts.extend(texts)

print(f"[DEBUG] 전체 텍스트 개수: {len(all_texts)} — 임베딩 시작")
# 임베딩
vectors = self._embed_texts(all_texts)
print(f"[DEBUG] 임베딩 완료: 벡터 shape = {vectors.shape}")

# HNSW 인덱스 저장
self._save_to_vector_store(all_texts, vectors)
print(f"[DEBUG] 인덱스 및 메타데이터 저장 완료")

return all_texts, vectors

# ──────────────────────────
# 3. 전체 폴더 임베딩
# ──────────────────────────
def embed_all(self) -> Tuple[List[str], np.ndarray]:
"""
1) S3 폴더 내 모든 파일 목록 조회
2) 파일별로 배치 단위 임베딩 + 정규화
3) HNSW 인덱스와 메타데이터 저장
"""
log_mem("init start")
self.folder_path = folder_path
self.batch_size = batch_size
self.start_entries = start_entries
self.max_entries = max_entries
self.localstorage = LocalStorage()
self._model = HuggingFaceEmbeddings(model_name=model_name)
self._index: faiss.IndexHNSWFlat | None = None
self._m = m
self._ef_construction = ef_construction
self._ef_search = ef_search
log_mem("init complete")

def _yield_text_batches(self) -> Iterable[List[str]]:
file_list = self.localstorage.list_files_in_folder(self.folder_path)
all_texts: List[str] = []
print(f"[DEBUG] embed_all 시작: 폴더 내 파일 수 = {len(file_list)}")
sent_count = 0

for idx, fp in enumerate(file_list, start=1):
print(f"[DEBUG] ({idx}/{len(file_list)}) 파일 읽는 중: {fp}")
# 전체 파일 텍스트 추출
if fp.lower().endswith(".txt"):
texts = self.localstorage.get_all_places_from_predata(fp)

print(f"[DEBUG] -> 추출된 텍스트 개수: {len(texts)}")
all_texts.extend(texts)

print(f"[DEBUG] 전체 텍스트 개수: {len(all_texts)} — 임베딩 시작")
# 임베딩
vectors = self._embed_texts(all_texts)
print(f"[DEBUG] 임베딩 완료: 벡터 shape = {vectors.shape}")

# HNSW 인덱스 & 메타데이터 저장
self._save_to_vector_store(all_texts, vectors)
print(f"[DEBUG] 인덱스 및 메타데이터 저장 완료")

return all_texts, vectors

# ────────────────────────
# CLI 테스트
# ────────────────────────
if not fp.lower().endswith(".txt"):
continue
texts = self.localstorage.get_all_places_from_predata(fp)
print(f" • ({idx}/{len(file_list)}) {fp}: {len(texts)}개 문장")

for i in range(0, len(texts), self.batch_size):
batch = texts[i : i + self.batch_size]
sent_count += len(batch)
# skipping already processed
if sent_count <= self.start_entries:
continue
# stop if exceeded
if self.max_entries is not None and sent_count > self.max_entries:
return
yield batch

def build_hnsw_stream(self) -> None:
total = 0
METADATA_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(METADATA_PATH, "w", encoding="utf-8") as meta_f:
for batch_texts in self._yield_text_batches():
log_mem("before embed_documents")
vecs = self._model.embed_documents(batch_texts)
log_mem("after embed_documents")

vecs = normalize(np.asarray(vecs, dtype="float32"), norm="l2")
log_mem("after normalize")

if self._index is None:
dim = vecs.shape[1]
self._index = faiss.IndexHNSWFlat(dim, self._m)
self._index.hnsw.efConstruction = self._ef_construction
self._index.hnsw.efSearch = self._ef_search
print(f"[INFO] HNSW 초기화: dim={dim}")
log_mem("after HNSW init")

log_mem("before index.add")
self._index.add(vecs)
log_mem("after index.add")

for t in batch_texts:
meta_f.write(json.dumps(t, ensure_ascii=False) + "\n")

total += len(batch_texts)
log_mem("after cleanup")

log_mem("before write_index")
HNSW_INDEX_PATH.parent.mkdir(parents=True, exist_ok=True)
faiss.write_index(self._index, str(HNSW_INDEX_PATH))
log_mem("after write_index")
print(f"[DONE] 인덱스 및 메타데이터 저장 완료 (총 {total} vectors)")


if __name__ == "__main__":
import argparse

# parser = argparse.ArgumentParser()
# group = parser.add_mutually_exclusive_group(required=True)
# group.add_argument("--file", help="단일 S3 키 지정")
# group.add_argument("--n", type=int, metavar="N", help="첫 N개 파일 처리")
# group.add_argument("--all", action="store_true", help="폴더 전체 처리")
# args = parser.parse_args()

emb = EmbedFromS3()
texts, vecs = emb.embed_all()
# if args.file:
# texts, vecs = emb.embed_file(args.file)
# elif args.n is not None:
# texts, vecs = emb.embed_k_files(args.n)
# else:
# texts, vecs = emb.embed_all()

print(f" 임베딩 및 저장 완료 • 총 {len(texts)}개 • shape={vecs.shape}")
parser = argparse.ArgumentParser()
parser.add_argument("--folder_path", default="pre_processed_data/")
parser.add_argument("--model_name", default="BM-K/KoSimCSE-roberta")
parser.add_argument("--batch_size", type=int, default=64)
parser.add_argument("--start_entries", type=int, default=0, help="건너뛸 문장 수")
parser.add_argument("--max_entries", type=int, default=None, help="처리할 최대 문장 수")
parser.add_argument("--m", type=int, default=32)
parser.add_argument("--ef_construction", type=int, default=40)
parser.add_argument("--ef_search", type=int, default=16)
args = parser.parse_args()

EmbedFromS3(
folder_path=args.folder_path,
model_name=args.model_name,
batch_size=args.batch_size,
start_entries=args.start_entries,
max_entries=args.max_entries,
m=args.m,
ef_construction=args.ef_construction,
ef_search=args.ef_search,
).build_hnsw_stream()

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
주소 : 대한민국 제주특별자치도 서귀포시 대정읍 신도리 2890번지
위도, 경도 : 33.2868223, 126.1727945
이름 : 오르간파이프
전체 평점 : 4.8
타입 : home_goods_store, store, point_of_interest, establishment
한 줄 요약 리뷰: 다양한 종류의 다육식물을 구할 수 있고 가격이 합리적이며, 판매점의 친절함도 좋다는 긍정적인 리뷰가 많습니다.

주소 : 대한민국 제주시
위도, 경도 : 33.28752, 126.16779
이름 : 한장동
전체 평점 : 평점 없음
타입 : transit_station, point_of_interest, establishment
한 줄 요약 리뷰: 리뷰 없음

주소 : 대한민국 제주특별자치도 제주시 특별자치도, 한경면 고산리 3918번지 KR
위도, 경도 : 33.285573, 126.1667494
이름 : 대해양식
전체 평점 : 3
타입 : restaurant, food, point_of_interest, establishment
한 줄 요약 리뷰: 리뷰 없음

주소 : 대한민국 제주특별자치도 제주시 특별자치도, 한경면 고산리 3134-11번지
위도, 경도 : 33.29269349999999, 126.1780417
이름 : (주)협성
전체 평점 : 평점 없음
타입 : point_of_interest, establishment
한 줄 요약 리뷰: 리뷰 없음

주소 : 대한민국 제주특별자치도 제주시 한경면 고산리 3987
위도, 경도 : 33.2835332, 126.1678205
이름 : 보라매수산
전체 평점 : 3.6
타입 : point_of_interest, establishment
한 줄 요약 리뷰: 이곳은 광어 앙식장으로 인기가 많고 경치가 좋으며, 시기와 조건에 따라 낚시나 보말잡기를 즐길 수 있는 장소이지만, 규모가 넓어 인원 한정으로 운영되며, 일부 관광객은

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
주소 : 대한민국 제주시
위도, 경도 : 33.294876, 126.204179
이름 : 전답동
전체 평점 : 평점 없음
타입 : transit_station, point_of_interest, establishment
한 줄 요약 리뷰: 리뷰 없음

주소 : 대한민국 제주시
위도, 경도 : 33.295062, 126.203939
이름 : 고산2리 전답동
전체 평점 : 평점 없음
타입 : transit_station, point_of_interest, establishment
한 줄 요약 리뷰: 리뷰 없음

Loading