Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 27 additions & 57 deletions learn2rag/pipeline/app.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
import asyncio
import json
import logging
import secrets
from collections.abc import Coroutine
from operator import itemgetter
from concurrent.futures import ThreadPoolExecutor
from typing import Any, AsyncGenerator, Generator, List, Optional
from typing import (
Any,
AsyncGenerator,
List,
Optional,
)

from fastapi import FastAPI, Body, Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from qdrant_client.models import ScoredPoint

from . import generate
from . import ingestion
from .config import user_config, opt_config
from .qdrant import Qdrant
from .search import search_authorized
from .operators import BasicPipeline
from .operators.base import BaseOperator

pipeline: BaseOperator = BasicPipeline()


class QuestionInput(BaseModel):
Expand All @@ -38,11 +44,10 @@ class TestResponse(BaseModel):
message: str

async def simple_chatbot_response(input: QuestionInput) -> Any:
results = await search_authorized(question=input.question, user=input.user)
# sources = "\n".join(set(result.payload['path'] for result in results))
answer = generate.generate(input.question, results, opt_config)
# full_response = f"{answer}\n\n{sources}"
return answer # full_response
return itemgetter('answer')(await pipeline(inputs={
'question': input.question,
'user': input.user,
}))


example_query = "What approach did Arjun Singh's campaign use to respond to voters' concerns on social media platforms during the municipal elections in Delhi?"
Expand Down Expand Up @@ -117,56 +122,22 @@ async def chat_completions(
return await simple_response(inputs)


async def pipeline(inputs: ChatState) -> list[str]:
request_id = secrets.token_hex()
question = inputs.messages[-1].content

results = await search_authorized(user=inputs.user, question=question, request_id=request_id)
# sources = "\n".join(set(result.payload['path'] for result in results))

executor = ThreadPoolExecutor()
loop = asyncio.get_event_loop()

def sync_gen() -> Generator[str, Any, None]:
for chunk in generate.generate_stream(question, results, opt_config, request_id=request_id):
yield chunk
async def run_pipeline(chat_state: ChatState) -> Any:
if not chat_state.user:
raise ValueError("User Missing")

chunks = await loop.run_in_executor(executor, lambda: list(sync_gen()))
return chunks
return await pipeline(inputs={
'question': chat_state.messages[-1].content,
'user': chat_state.user,
})


async def event_stream(inputs: ChatState) -> AsyncGenerator[Any, Any]:
try:
if not inputs.user:
raise ValueError("User Missing")
answer = itemgetter('answer')(await run_pipeline(inputs))

chunks = await pipeline(inputs)

yield f"data: {json.dumps({'choices': [{'delta': {}, 'finish_reason': None}]})}\n\n"

for chunk in chunks:
msg = {
"choices": [
{
"delta": {"content": chunk},
"finish_reason": None
}
]
}
yield f"data: {json.dumps(msg)}\n\n"
# await asyncio.sleep(0.1) # delay for stream check

# msg = {
# "choices": [
# {
# "delta": {"content": "\n\n" + sources},
# "finish_reason": None
# }
# ]
# }
# yield f"data: {json.dumps(msg)}\n\n"

yield f"data: {json.dumps({'choices': [{'delta': {}, 'finish_reason': 'stop'}]})}\n\n"
delta = {'content': answer}
yield f"data: {json.dumps({'choices': [{'delta': delta, 'finish_reason': 'stop'}]})}\n\n"
except Exception as e:
logging.error('%s: %s', e.__class__, e)
content = 'There is a problem with Learn2RAG configuration. Please contact your administrator.' # FIXME
Expand All @@ -175,14 +146,13 @@ async def event_stream(inputs: ChatState) -> AsyncGenerator[Any, Any]:


async def simple_response(inputs: ChatState) -> JSONResponse:
if not inputs.user:
raise ValueError("User Missing")
answer = itemgetter('answer')(await run_pipeline(inputs))

return JSONResponse({
'choices': [
{
'message': {
'content': ''.join(await pipeline(inputs)),
'content': answer,
'role': 'assistant',
},
'finish_reason': 'stop',
Expand Down
7 changes: 1 addition & 6 deletions learn2rag/pipeline/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from .llm import llm


profilingLogger = logging.getLogger('profiling')

context_template ="""
-----
Source: {source}
Expand All @@ -27,8 +25,7 @@ def generate(query: str, search_results: list[ScoredPoint], opt_config: dict[str
return answer.content


def generate_stream(query: str, search_results: list[ScoredPoint], opt_config: dict[str, Any], request_id: str | None=None) -> Generator[str, None, None]:
profilingLogger.info('start', extra={'activity': 'generate', 'request_id': request_id})
def generate_stream(query: str, search_results: list[ScoredPoint], opt_config: dict[str, Any]) -> Generator[str, None, None]:
assert llm is not None

if hasattr(search_results, "points"):
Expand All @@ -44,5 +41,3 @@ def generate_stream(query: str, search_results: list[ScoredPoint], opt_config: d
text_chunk = chunk.text()
if text_chunk:
yield text_chunk

profilingLogger.info('end', extra={'activity': 'generate', 'request_id': request_id})
23 changes: 15 additions & 8 deletions learn2rag/pipeline/main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import asyncio
import logging
import logging.config
import yaml
import asyncio
from operator import itemgetter

from langchain_core.documents.base import Document

from . import ingestion
from . import search
from . import generate
from .operators import BasicPipeline
from .store import delete_collection, delete_documents, get_documents, update_documents


if __name__ == "__main__":
async def main() -> None:
try:
logging.config.dictConfig(yaml.safe_load(open("./learn2rag/pipeline/logging.yaml").read()))
except FileNotFoundError:
Expand Down Expand Up @@ -62,22 +65,26 @@
if opt_config["query_mode"] == "multi":
# in query_mode 'multi' different querys for each vector in the multi-vector are allowed
multi_query = {"content": "What is USM AI?", "title": "What is USM AI?", "summary": "What is USM AI?", "source_path":"USU/ITSM/"}
results = search.search_multi(multi_query, user_config, opt_config, request_id=None)
results = search.search_multi(multi_query, user_config, opt_config)
points = results.points
# modify the query for generation part
query = " ".join(f"{k}={v}" for k, v in multi_query.items())
answer = generate.generate(query, points, opt_config)
else:
pipeline = BasicPipeline()
query = "Was sind A, B und C?"
user = "anonymous"
points = asyncio.run(search.search_authorized(query, user, request_id=None))
answer, points = itemgetter('answer', 'documents')(await pipeline(
inputs={'question': query, 'user': 'anonymous'},
))

sources = set(point.payload['source'] for point in points) # type: ignore[index]
sources = "\n".join(set(point.payload['path'] for point in points)) # type: ignore[index]

for point in points:
print(f"ID: {point.id}, Path: {point.payload['source']}, Score: {point.score}") # type: ignore[index]

answer = generate.generate(query, points, opt_config)

print(query)
print(answer)
print(sources)

if __name__ == "__main__":
asyncio.run(main())
7 changes: 7 additions & 0 deletions learn2rag/pipeline/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .basic_pipeline import BasicPipeline
from .example_pipeline import ExamplePipeline

__all__ = [
'BasicPipeline',
'ExamplePipeline',
]
30 changes: 30 additions & 0 deletions learn2rag/pipeline/operators/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import abc
import logging
import time
from typing import Any

from ..prov import Activity, Prov

profilingLogger = logging.getLogger('profiling')


class BaseOperator(abc.ABC):
async def __call__(self, inputs: Any, prov: Prov | None = None) -> Any:
if prov is None:
prov = Prov()
label = self.__class__.__name__
startedAtTime = time.time()
profilingLogger.info('start', extra={'activity': label, 'request_id': prov.id})
outputs = await self.run(inputs=inputs, prov=prov)
endedAtTime = time.time()
profilingLogger.info('end', extra={'activity': label, 'request_id': prov.id})
prov.append(Activity(
label=label,
startedAtTime=startedAtTime,
endedAtTime=endedAtTime,
))
return outputs

@abc.abstractmethod
async def run(self, inputs: Any, prov: Prov) -> Any:
pass
39 changes: 39 additions & 0 deletions learn2rag/pipeline/operators/basic_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from operator import itemgetter
from typing import Any, TypedDict

from ..prov import Prov
from .base import BaseOperator
from .search import SearchOperator
from .generation import GenerationOperator

Inputs = TypedDict('Inputs', {
'question': str,
'user': str | None,
}, total=True)

Outputs = TypedDict('Outputs', {
'answer': str,
'documents': Any,
}, total=True)


class BasicPipeline(BaseOperator):
async def run(self, inputs: Inputs, prov: Prov) -> Outputs:
documents = itemgetter('documents')(await SearchOperator()(
inputs={
'question': inputs['question'],
'user': inputs['user'],
},
prov=prov,
))
answer = itemgetter('answer')(await GenerationOperator()(
inputs={
'question': inputs['question'],
'documents': documents,
},
prov=prov,
))
return {
'answer': answer,
'documents': documents,
}
25 changes: 25 additions & 0 deletions learn2rag/pipeline/operators/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from typing import TypedDict
import logging

from ..prov import Prov
from .base import BaseOperator

logger = logging.getLogger(__name__)

Inputs = TypedDict('Inputs', {
'question': str,
}, total=True)

Outputs = TypedDict('Outputs', {
'value': int,
}, total=True)


class ExampleOperator(BaseOperator):
async def run(self, inputs: Inputs, prov: Prov) -> Outputs:
logger.info('Amount of steps: %i', len(prov.items))
logger.info('Last step: %s', prov.items[-1].label)
logger.info('Amount of search steps: %i', sum(1 for item in prov.items if item.label == 'SearchOperator'))
return {
'value': 42,
}
41 changes: 41 additions & 0 deletions learn2rag/pipeline/operators/example_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from operator import itemgetter
from typing import Any, TypedDict

from ..prov import Prov
from .base import BaseOperator
from .example import ExampleOperator
from .search import SearchOperator
from .generation import GenerationOperator

Inputs = TypedDict('Inputs', {
'question': str,
'user': str | None,
}, total=True)

Outputs = TypedDict('Outputs', {
'answer': str,
'documents': Any,
}, total=True)


class ExamplePipeline(BaseOperator):
async def run(self, inputs: Inputs, prov: Prov) -> Outputs:
documents = itemgetter('documents')(await SearchOperator()(
inputs={
'question': inputs['question'],
'user': inputs['user'],
},
prov=prov,
))
answer = itemgetter('answer')(await GenerationOperator()(
inputs={
'question': inputs['question'],
'documents': documents,
},
prov=prov,
))
await ExampleOperator()(inputs={'question': inputs['question']}, prov=prov)
return {
'answer': answer,
'documents': documents,
}
23 changes: 23 additions & 0 deletions learn2rag/pipeline/operators/generation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Any, TypedDict

from .base import BaseOperator
from ..prov import Prov
from ..config import opt_config
from ..generate import generate

Inputs = TypedDict('Inputs', {
'question': str,
'documents': Any,
}, total=True)

Outputs = TypedDict('Outputs', {
'answer': str,
}, total=True)


class GenerationOperator(BaseOperator):
async def run(self, inputs: Inputs, prov: Prov) -> Outputs:
answer = generate(inputs['question'], inputs['documents'], opt_config)
return {
'answer': answer,
}
Loading
Loading