-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrag_pipeline.py
More file actions
92 lines (81 loc) · 3.5 KB
/
rag_pipeline.py
File metadata and controls
92 lines (81 loc) · 3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import os
from haystack import GeneratedAnswer, Pipeline
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.chat_prompt_builder import ChatPromptBuilder
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.generators.chat import HuggingFaceAPIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.utils import Secret
from haystack.utils.hf import HFGenerationAPIType
from couchbase_haystack import CouchbasePasswordAuthenticator, CouchbaseSearchDocumentStore, CouchbaseSearchEmbeddingRetriever
# Load HF Token from environment variables.
HF_TOKEN = Secret.from_env_var("HF_API_TOKEN")
# Make sure you have a running couchbase database, e.g. with Docker:
# docker run \
# --restart always \
# --publish=8091-8096:8091-8096 --publish=11210:11210 \
# --env COUCHBASE_ADMINISTRATOR_USERNAME=admin \
# --env COUCHBASE_ADMINISTRATOR_PASSWORD=passw0rd \
# couchbase:enterprise-7.6.2
document_store = CouchbaseSearchDocumentStore(
cluster_connection_string=Secret.from_env_var("CONNECTION_STRING"),
authenticator=CouchbasePasswordAuthenticator(
username=Secret.from_env_var("USER_NAME"), password=Secret.from_env_var("PASSWORD")
),
bucket=os.getenv("BUCKET_NAME"),
scope=os.getenv("SCOPE_NAME"),
collection=os.getenv("COLLECTION_NAME"),
vector_search_index="vector_search",
)
# Build a RAG pipeline with a Retriever to get relevant documents to the query and a HuggingFaceTGIGenerator
# interacting with LLMs using a custom prompt.
prompt_messages = [
ChatMessage.from_system("You are a helpful assistant that answers questions based on the provided documents."),
ChatMessage.from_user(
"""Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:"""
),
]
rag_pipeline = Pipeline()
rag_pipeline.add_component(
"query_embedder",
SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2", progress_bar=False),
)
rag_pipeline.add_component("retriever", CouchbaseSearchEmbeddingRetriever(document_store=document_store))
rag_pipeline.add_component("prompt_builder", ChatPromptBuilder(template=prompt_messages, required_variables=["question"]))
rag_pipeline.add_component(
"llm",
HuggingFaceAPIChatGenerator(
api_type=HFGenerationAPIType.SERVERLESS_INFERENCE_API,
api_params={"model": "mistralai/Mistral-7B-Instruct-v0.2"},
),
)
rag_pipeline.add_component("answer_builder", AnswerBuilder())
rag_pipeline.connect("query_embedder", "retriever.query_embedding")
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
rag_pipeline.connect("llm.replies", "answer_builder.replies")
rag_pipeline.connect("retriever", "answer_builder.documents")
# Ask a question on the data you just added.
question = "Who created the Dothraki vocabulary?"
result = rag_pipeline.run(
{
"query_embedder": {"text": question},
"retriever": {"top_k": 3},
"prompt_builder": {"question": question},
"answer_builder": {"query": question},
}
)
# For details, like which documents were used to generate the answer, look into the GeneratedAnswer object
answer: GeneratedAnswer = result["answer_builder"]["answers"][0]
# ruff: noqa: T201
print("Query: ", answer.query)
print("Answer: ", answer.data)
print("== Sources:")
for doc in answer.documents:
print("-> ", doc.meta["file_path"])