diff --git a/learn2rag/pipeline/app.py b/learn2rag/pipeline/app.py index c5f0b78..0fa9148 100644 --- a/learn2rag/pipeline/app.py +++ b/learn2rag/pipeline/app.py @@ -1,10 +1,13 @@ -import asyncio import json import logging -import secrets -from collections.abc import Coroutine +from operator import itemgetter from concurrent.futures import ThreadPoolExecutor -from typing import Any, AsyncGenerator, Generator, List, Optional +from typing import ( + Any, + AsyncGenerator, + List, + Optional, +) from fastapi import FastAPI, Body, Request, status from fastapi.exceptions import RequestValidationError @@ -12,11 +15,14 @@ from pydantic import BaseModel from qdrant_client.models import ScoredPoint -from . import generate from . import ingestion from .config import user_config, opt_config from .qdrant import Qdrant from .search import search_authorized +from .operators import BasicPipeline +from .operators.base import BaseOperator + +pipeline: BaseOperator = BasicPipeline() class QuestionInput(BaseModel): @@ -38,11 +44,10 @@ class TestResponse(BaseModel): message: str async def simple_chatbot_response(input: QuestionInput) -> Any: - results = await search_authorized(question=input.question, user=input.user) - # sources = "\n".join(set(result.payload['path'] for result in results)) - answer = generate.generate(input.question, results, opt_config) - # full_response = f"{answer}\n\n{sources}" - return answer # full_response + return itemgetter('answer')(await pipeline(inputs={ + 'question': input.question, + 'user': input.user, + })) example_query = "What approach did Arjun Singh's campaign use to respond to voters' concerns on social media platforms during the municipal elections in Delhi?" @@ -117,56 +122,22 @@ async def chat_completions( return await simple_response(inputs) -async def pipeline(inputs: ChatState) -> list[str]: - request_id = secrets.token_hex() - question = inputs.messages[-1].content - - results = await search_authorized(user=inputs.user, question=question, request_id=request_id) - # sources = "\n".join(set(result.payload['path'] for result in results)) - - executor = ThreadPoolExecutor() - loop = asyncio.get_event_loop() - - def sync_gen() -> Generator[str, Any, None]: - for chunk in generate.generate_stream(question, results, opt_config, request_id=request_id): - yield chunk +async def run_pipeline(chat_state: ChatState) -> Any: + if not chat_state.user: + raise ValueError("User Missing") - chunks = await loop.run_in_executor(executor, lambda: list(sync_gen())) - return chunks + return await pipeline(inputs={ + 'question': chat_state.messages[-1].content, + 'user': chat_state.user, + }) async def event_stream(inputs: ChatState) -> AsyncGenerator[Any, Any]: try: - if not inputs.user: - raise ValueError("User Missing") + answer = itemgetter('answer')(await run_pipeline(inputs)) - chunks = await pipeline(inputs) - - yield f"data: {json.dumps({'choices': [{'delta': {}, 'finish_reason': None}]})}\n\n" - - for chunk in chunks: - msg = { - "choices": [ - { - "delta": {"content": chunk}, - "finish_reason": None - } - ] - } - yield f"data: {json.dumps(msg)}\n\n" - # await asyncio.sleep(0.1) # delay for stream check - - # msg = { - # "choices": [ - # { - # "delta": {"content": "\n\n" + sources}, - # "finish_reason": None - # } - # ] - # } - # yield f"data: {json.dumps(msg)}\n\n" - - yield f"data: {json.dumps({'choices': [{'delta': {}, 'finish_reason': 'stop'}]})}\n\n" + delta = {'content': answer} + yield f"data: {json.dumps({'choices': [{'delta': delta, 'finish_reason': 'stop'}]})}\n\n" except Exception as e: logging.error('%s: %s', e.__class__, e) content = 'There is a problem with Learn2RAG configuration. Please contact your administrator.' # FIXME @@ -175,14 +146,13 @@ async def event_stream(inputs: ChatState) -> AsyncGenerator[Any, Any]: async def simple_response(inputs: ChatState) -> JSONResponse: - if not inputs.user: - raise ValueError("User Missing") + answer = itemgetter('answer')(await run_pipeline(inputs)) return JSONResponse({ 'choices': [ { 'message': { - 'content': ''.join(await pipeline(inputs)), + 'content': answer, 'role': 'assistant', }, 'finish_reason': 'stop', diff --git a/learn2rag/pipeline/generate.py b/learn2rag/pipeline/generate.py index cb913a6..aa47fbf 100644 --- a/learn2rag/pipeline/generate.py +++ b/learn2rag/pipeline/generate.py @@ -5,8 +5,6 @@ from .llm import llm -profilingLogger = logging.getLogger('profiling') - context_template =""" ----- Source: {source} @@ -27,8 +25,7 @@ def generate(query: str, search_results: list[ScoredPoint], opt_config: dict[str return answer.content -def generate_stream(query: str, search_results: list[ScoredPoint], opt_config: dict[str, Any], request_id: str | None=None) -> Generator[str, None, None]: - profilingLogger.info('start', extra={'activity': 'generate', 'request_id': request_id}) +def generate_stream(query: str, search_results: list[ScoredPoint], opt_config: dict[str, Any]) -> Generator[str, None, None]: assert llm is not None if hasattr(search_results, "points"): @@ -44,5 +41,3 @@ def generate_stream(query: str, search_results: list[ScoredPoint], opt_config: d text_chunk = chunk.text() if text_chunk: yield text_chunk - - profilingLogger.info('end', extra={'activity': 'generate', 'request_id': request_id}) diff --git a/learn2rag/pipeline/main.py b/learn2rag/pipeline/main.py index a6c90e5..0e2505b 100755 --- a/learn2rag/pipeline/main.py +++ b/learn2rag/pipeline/main.py @@ -1,16 +1,19 @@ +import asyncio +import logging import logging.config import yaml -import asyncio +from operator import itemgetter from langchain_core.documents.base import Document from . import ingestion from . import search from . import generate +from .operators import BasicPipeline from .store import delete_collection, delete_documents, get_documents, update_documents -if __name__ == "__main__": +async def main() -> None: try: logging.config.dictConfig(yaml.safe_load(open("./learn2rag/pipeline/logging.yaml").read())) except FileNotFoundError: @@ -62,22 +65,26 @@ if opt_config["query_mode"] == "multi": # in query_mode 'multi' different querys for each vector in the multi-vector are allowed multi_query = {"content": "What is USM AI?", "title": "What is USM AI?", "summary": "What is USM AI?", "source_path":"USU/ITSM/"} - results = search.search_multi(multi_query, user_config, opt_config, request_id=None) + results = search.search_multi(multi_query, user_config, opt_config) points = results.points # modify the query for generation part query = " ".join(f"{k}={v}" for k, v in multi_query.items()) + answer = generate.generate(query, points, opt_config) else: + pipeline = BasicPipeline() query = "Was sind A, B und C?" - user = "anonymous" - points = asyncio.run(search.search_authorized(query, user, request_id=None)) + answer, points = itemgetter('answer', 'documents')(await pipeline( + inputs={'question': query, 'user': 'anonymous'}, + )) - sources = set(point.payload['source'] for point in points) # type: ignore[index] + sources = "\n".join(set(point.payload['path'] for point in points)) # type: ignore[index] for point in points: print(f"ID: {point.id}, Path: {point.payload['source']}, Score: {point.score}") # type: ignore[index] - answer = generate.generate(query, points, opt_config) - print(query) print(answer) print(sources) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/learn2rag/pipeline/operators/__init__.py b/learn2rag/pipeline/operators/__init__.py new file mode 100644 index 0000000..06a2cfa --- /dev/null +++ b/learn2rag/pipeline/operators/__init__.py @@ -0,0 +1,7 @@ +from .basic_pipeline import BasicPipeline +from .example_pipeline import ExamplePipeline + +__all__ = [ + 'BasicPipeline', + 'ExamplePipeline', +] diff --git a/learn2rag/pipeline/operators/base.py b/learn2rag/pipeline/operators/base.py new file mode 100644 index 0000000..f5507e6 --- /dev/null +++ b/learn2rag/pipeline/operators/base.py @@ -0,0 +1,30 @@ +import abc +import logging +import time +from typing import Any + +from ..prov import Activity, Prov + +profilingLogger = logging.getLogger('profiling') + + +class BaseOperator(abc.ABC): + async def __call__(self, inputs: Any, prov: Prov | None = None) -> Any: + if prov is None: + prov = Prov() + label = self.__class__.__name__ + startedAtTime = time.time() + profilingLogger.info('start', extra={'activity': label, 'request_id': prov.id}) + outputs = await self.run(inputs=inputs, prov=prov) + endedAtTime = time.time() + profilingLogger.info('end', extra={'activity': label, 'request_id': prov.id}) + prov.append(Activity( + label=label, + startedAtTime=startedAtTime, + endedAtTime=endedAtTime, + )) + return outputs + + @abc.abstractmethod + async def run(self, inputs: Any, prov: Prov) -> Any: + pass diff --git a/learn2rag/pipeline/operators/basic_pipeline.py b/learn2rag/pipeline/operators/basic_pipeline.py new file mode 100644 index 0000000..abd049c --- /dev/null +++ b/learn2rag/pipeline/operators/basic_pipeline.py @@ -0,0 +1,39 @@ +from operator import itemgetter +from typing import Any, TypedDict + +from ..prov import Prov +from .base import BaseOperator +from .search import SearchOperator +from .generation import GenerationOperator + +Inputs = TypedDict('Inputs', { + 'question': str, + 'user': str | None, +}, total=True) + +Outputs = TypedDict('Outputs', { + 'answer': str, + 'documents': Any, +}, total=True) + + +class BasicPipeline(BaseOperator): + async def run(self, inputs: Inputs, prov: Prov) -> Outputs: + documents = itemgetter('documents')(await SearchOperator()( + inputs={ + 'question': inputs['question'], + 'user': inputs['user'], + }, + prov=prov, + )) + answer = itemgetter('answer')(await GenerationOperator()( + inputs={ + 'question': inputs['question'], + 'documents': documents, + }, + prov=prov, + )) + return { + 'answer': answer, + 'documents': documents, + } diff --git a/learn2rag/pipeline/operators/example.py b/learn2rag/pipeline/operators/example.py new file mode 100644 index 0000000..f14cba3 --- /dev/null +++ b/learn2rag/pipeline/operators/example.py @@ -0,0 +1,25 @@ +from typing import TypedDict +import logging + +from ..prov import Prov +from .base import BaseOperator + +logger = logging.getLogger(__name__) + +Inputs = TypedDict('Inputs', { + 'question': str, +}, total=True) + +Outputs = TypedDict('Outputs', { + 'value': int, +}, total=True) + + +class ExampleOperator(BaseOperator): + async def run(self, inputs: Inputs, prov: Prov) -> Outputs: + logger.info('Amount of steps: %i', len(prov.items)) + logger.info('Last step: %s', prov.items[-1].label) + logger.info('Amount of search steps: %i', sum(1 for item in prov.items if item.label == 'SearchOperator')) + return { + 'value': 42, + } diff --git a/learn2rag/pipeline/operators/example_pipeline.py b/learn2rag/pipeline/operators/example_pipeline.py new file mode 100644 index 0000000..be315c3 --- /dev/null +++ b/learn2rag/pipeline/operators/example_pipeline.py @@ -0,0 +1,41 @@ +from operator import itemgetter +from typing import Any, TypedDict + +from ..prov import Prov +from .base import BaseOperator +from .example import ExampleOperator +from .search import SearchOperator +from .generation import GenerationOperator + +Inputs = TypedDict('Inputs', { + 'question': str, + 'user': str | None, +}, total=True) + +Outputs = TypedDict('Outputs', { + 'answer': str, + 'documents': Any, +}, total=True) + + +class ExamplePipeline(BaseOperator): + async def run(self, inputs: Inputs, prov: Prov) -> Outputs: + documents = itemgetter('documents')(await SearchOperator()( + inputs={ + 'question': inputs['question'], + 'user': inputs['user'], + }, + prov=prov, + )) + answer = itemgetter('answer')(await GenerationOperator()( + inputs={ + 'question': inputs['question'], + 'documents': documents, + }, + prov=prov, + )) + await ExampleOperator()(inputs={'question': inputs['question']}, prov=prov) + return { + 'answer': answer, + 'documents': documents, + } diff --git a/learn2rag/pipeline/operators/generation.py b/learn2rag/pipeline/operators/generation.py new file mode 100644 index 0000000..c90cb01 --- /dev/null +++ b/learn2rag/pipeline/operators/generation.py @@ -0,0 +1,23 @@ +from typing import Any, TypedDict + +from .base import BaseOperator +from ..prov import Prov +from ..config import opt_config +from ..generate import generate + +Inputs = TypedDict('Inputs', { + 'question': str, + 'documents': Any, +}, total=True) + +Outputs = TypedDict('Outputs', { + 'answer': str, +}, total=True) + + +class GenerationOperator(BaseOperator): + async def run(self, inputs: Inputs, prov: Prov) -> Outputs: + answer = generate(inputs['question'], inputs['documents'], opt_config) + return { + 'answer': answer, + } diff --git a/learn2rag/pipeline/operators/search.py b/learn2rag/pipeline/operators/search.py new file mode 100644 index 0000000..0cbafbb --- /dev/null +++ b/learn2rag/pipeline/operators/search.py @@ -0,0 +1,24 @@ +from concurrent.futures import ThreadPoolExecutor +from typing import Any, TypedDict +import asyncio + +from ..prov import Prov +from .base import BaseOperator +from ..search import search_authorized + +Inputs = TypedDict('Inputs', { + 'question': str, + 'user': str, +}, total=True) + +Outputs = TypedDict('Outputs', { + 'documents': Any, +}, total=True) + + +class SearchOperator(BaseOperator): + async def run(self, inputs: Inputs, prov: Prov) -> Outputs: + documents = await search_authorized(question=inputs['question'], user=inputs['user']) + return { + 'documents': documents, + } diff --git a/learn2rag/pipeline/prov.py b/learn2rag/pipeline/prov.py new file mode 100644 index 0000000..bec0dcf --- /dev/null +++ b/learn2rag/pipeline/prov.py @@ -0,0 +1,20 @@ +from dataclasses import dataclass, field +from secrets import token_hex +from typing import Any + + +@dataclass +class Activity(): + label: str + startedAtTime: float + endedAtTime: float + data: Any = field(default_factory=dict) + + +class Prov(): + def __init__(self) -> None: + self.id = token_hex() + self.items: list[Activity] = [] + + def append(self, activity: Activity) -> None: + self.items.append(activity) diff --git a/learn2rag/pipeline/search.py b/learn2rag/pipeline/search.py index e41f16a..a4b43ce 100644 --- a/learn2rag/pipeline/search.py +++ b/learn2rag/pipeline/search.py @@ -16,7 +16,6 @@ from .qdrant import Qdrant from . import rewrite - profilingLogger = logging.getLogger('profiling') @@ -169,7 +168,7 @@ def _collect_query_points( opt_config.get("top_k"), extra={'activity': '_collect_query_points', 'request_id': request_id}, ) - base_results = search(query, user_config, opt_config, request_id=request_id) + base_results = search(query, user_config, opt_config) points_all.extend(base_results.points) if opt_config.get("rewrite") == "True": @@ -197,7 +196,7 @@ def _collect_query_points( ) for sq in subqueries: - sq_results = search(sq, user_config, opt_config_subqueries, request_id=request_id) + sq_results = search(sq, user_config, opt_config_subqueries) points_all.extend(sq_results.points) if rewrite_mode in ["keywords", "subqueries_keywords"]: @@ -217,7 +216,7 @@ def _collect_query_points( ) for kw in keywords: - kw_results = search(kw, user_config, opt_config_keywords, request_id=request_id) + kw_results = search(kw, user_config, opt_config_keywords) points_all.extend(kw_results.points) points = _sort_and_deduplicate(points_all) @@ -257,14 +256,14 @@ def _collect_query_points( # similarity search -def search(query: str, user_config: dict[str, Any], opt_config: dict[str, Any], *, request_id: str | None=None) -> QueryResponse: - profilingLogger.info('start', extra={'activity': 'search', 'request_id': request_id}) +def search(query: str, user_config: dict[str, Any], opt_config: dict[str, Any]) -> QueryResponse: + profilingLogger.info('start', extra={'activity': 'search'}) profilingLogger.info( "search_called query=%r search_mode=%s collection_name=%s", query, opt_config.get("search_mode"), user_config.get("collection_name"), - extra={'activity': '_collect_query_points', 'request_id': request_id}, + extra={'activity': '_collect_query_points'}, ) collection_name = user_config["collection_name"] @@ -384,8 +383,7 @@ def search(query: str, user_config: dict[str, Any], opt_config: dict[str, Any], ) return results - -def search_multi(multi_query: dict[str, str], user_config: dict[str, Any], opt_config: dict[str, Any], request_id: str | None=None) -> QueryResponse: +def search_multi(multi_query: dict[str, str], user_config: dict[str, Any], opt_config: dict[str, Any]) -> QueryResponse: collection_name = user_config["collection_name"] # Init vector store @@ -416,12 +414,10 @@ def search_multi(multi_query: dict[str, str], user_config: dict[str, Any], opt_c using="multi", limit=opt_config["top_k"], ) - profilingLogger.info('end', extra={'activity': 'search', 'request_id': request_id}) return results - -async def search_authorized(question: str, user: str, *, request_id: str | None = None) -> List[ScoredPoint]: - points = _collect_query_points(question, user_config, opt_config, request_id=request_id) +async def search_authorized(question: str, user: str) -> List[ScoredPoint]: + points = _collect_query_points(question, user_config, opt_config) query_response = QueryResponse(points=points) authorized_points = await filter_authorized(user, query_response) # keep deterministic order after auth filter diff --git a/learn2rag/tests/test_learn2rag.py b/learn2rag/tests/test_learn2rag.py index 095de51..dab7d1b 100644 --- a/learn2rag/tests/test_learn2rag.py +++ b/learn2rag/tests/test_learn2rag.py @@ -1,3 +1,4 @@ +import logging import shutil from pathlib import Path from unittest import TestCase @@ -6,7 +7,11 @@ from ..compose import Project from ..utils import is_windows, save_data_path, waitUntil +import pytest from openai import APIConnectionError, OpenAI +from _pytest.logging import LogCaptureFixture + +logger = logging.getLogger(__name__) template_dir = Path(__file__).resolve().parent.parent / 'ui' / 'templates' / 'compose' / 'pipelines' data_dir = Path(__file__).resolve().parent / 'data' @@ -18,6 +23,12 @@ class Learn2RAGTestCase(TestCase): rag_port: int storage_path: Path + @pytest.fixture(autouse=True) + def use_caplog(self, caplog: LogCaptureFixture) -> None: + caplog.set_level(logging.WARNING, logger='httpcore') + caplog.set_level(logging.WARNING, logger='httpx') + caplog.set_level(logging.WARNING, logger='openai') + def setUp(self) -> None: self.project_name = 'test' self.rag_port = 5002 @@ -93,6 +104,7 @@ def check_rag() -> None: ], ) content = completion.choices[-1].message.content + logger.debug('Response content: %s', content) assert 'for testing only' in content, 'contains test marker' assert "Information:\\n" in content, 'contains the prompt' assert not content.endswith("Information:\\n"), 'contains any document chunks in the prompt' diff --git a/learn2rag/ui/templates/compose/pipelines/continuous.yml b/learn2rag/ui/templates/compose/pipelines/continuous.yml index bd4254f..81981a1 100644 --- a/learn2rag/ui/templates/compose/pipelines/continuous.yml +++ b/learn2rag/ui/templates/compose/pipelines/continuous.yml @@ -43,6 +43,8 @@ files: format: "%(log_color)s%(asctime)s %(levelname)-8s %(name)s %(message)s" profiling: format: "%(created)f %(request_id)s %(activity)s %(message)s" + defaults: + request_id: null handlers: display: class: logging.StreamHandler @@ -186,7 +188,8 @@ services: environment: LEARN2RAG_PATH: '{{learn2rag_path}}' STORAGE_PATH: '{{storage_path}}' - QDRANT__SERVICE__HTTP_PORT: '{{ports.qdrant_http}}' + QDRANT_LOCATION: '{{ pipeline.qdrant_location or "http://localhost:" ~ ports.qdrant_http }}' + QDRANT_PATH: '{{ pipeline.qdrant_path }}' QDRANT__SERVICE__API_KEY: '{{qdrant_api_key}}' PIPELINE_USER_CONFIG: '{{storage_path}}/basic_user_config.json' IMPORTER_CONFIG: '{{storage_path}}/importer_config.json' @@ -208,6 +211,7 @@ services: LEARN2RAG_PATH: '{{learn2rag_path}}' LEARN2RAG_PIPELINE_PORT: '{{ports.pipeline}}' QDRANT_LOCATION: '{{ pipeline.qdrant_location or "http://localhost:" ~ ports.qdrant_http }}' + QDRANT_PATH: '{{ pipeline.qdrant_path }}' QDRANT__SERVICE__API_KEY: '{{qdrant_api_key}}' PIPELINE_USER_CONFIG: '{{storage_path}}/basic_user_config.json' IMPORTER_CONFIG: '{{storage_path}}/importer_config.json' diff --git a/learn2rag/ui/templates/compose/pipelines/import.yml b/learn2rag/ui/templates/compose/pipelines/import.yml index fe9302b..6393279 100644 --- a/learn2rag/ui/templates/compose/pipelines/import.yml +++ b/learn2rag/ui/templates/compose/pipelines/import.yml @@ -40,6 +40,8 @@ files: format: "%(log_color)s%(asctime)s %(levelname)-8s %(name)s %(message)s" profiling: format: "%(created)f %(request_id)s %(activity)s %(message)s" + defaults: + request_id: null handlers: display: class: logging.StreamHandler diff --git a/learn2rag/ui/templates/compose/pipelines/pipeline.yml b/learn2rag/ui/templates/compose/pipelines/pipeline.yml index 1c77961..b113317 100644 --- a/learn2rag/ui/templates/compose/pipelines/pipeline.yml +++ b/learn2rag/ui/templates/compose/pipelines/pipeline.yml @@ -41,6 +41,8 @@ files: format: "%(log_color)s%(asctime)s %(levelname)-8s %(name)s %(message)s" profiling: format: "%(created)f %(request_id)s %(activity)s %(message)s" + defaults: + request_id: null handlers: display: class: logging.StreamHandler