From db79e7c5459acd8bb7dd3ae82cb842d7be919756 Mon Sep 17 00:00:00 2001 From: denkv Date: Tue, 28 Apr 2026 10:25:47 +0200 Subject: [PATCH 1/3] Add operators and provenance --- learn2rag/pipeline/app.py | 64 +++++-------------- learn2rag/pipeline/generate.py | 7 +- learn2rag/pipeline/main.py | 4 +- learn2rag/pipeline/operators/__init__.py | 5 ++ learn2rag/pipeline/operators/base.py | 30 +++++++++ .../pipeline/operators/basic_pipeline.py | 37 +++++++++++ learn2rag/pipeline/operators/generation.py | 23 +++++++ learn2rag/pipeline/operators/search.py | 24 +++++++ learn2rag/pipeline/prov.py | 20 ++++++ learn2rag/pipeline/search.py | 14 ++-- 10 files changed, 162 insertions(+), 66 deletions(-) create mode 100644 learn2rag/pipeline/operators/__init__.py create mode 100644 learn2rag/pipeline/operators/base.py create mode 100644 learn2rag/pipeline/operators/basic_pipeline.py create mode 100644 learn2rag/pipeline/operators/generation.py create mode 100644 learn2rag/pipeline/operators/search.py create mode 100644 learn2rag/pipeline/prov.py diff --git a/learn2rag/pipeline/app.py b/learn2rag/pipeline/app.py index d6958f5..f8fc6c4 100644 --- a/learn2rag/pipeline/app.py +++ b/learn2rag/pipeline/app.py @@ -1,9 +1,7 @@ -import asyncio import json import logging -import secrets -from concurrent.futures import ThreadPoolExecutor -from typing import Any, AsyncGenerator, Generator, List, Optional +from operator import itemgetter +from typing import Any, AsyncGenerator, List, Optional from fastapi import FastAPI, Body, Request, status from fastapi.exceptions import RequestValidationError @@ -11,10 +9,12 @@ from pydantic import BaseModel from qdrant_client.models import ScoredPoint -from . import generate from . import ingestion from .config import user_config, opt_config from .search import search_authorized +from .operators import BasicPipeline + +pipeline = BasicPipeline() class QuestionInput(BaseModel): @@ -30,17 +30,16 @@ class Message(BaseModel): class ChatState(BaseModel): messages: List[Message] stream: Optional[bool] = False - user: str | None = 'anonymous' # FIXME use https://developers.openai.com/api/docs/guides/safety-best-practices#safety-identifiers + user: str = 'anonymous' # FIXME use https://developers.openai.com/api/docs/guides/safety-best-practices#safety-identifiers class TestResponse(BaseModel): message: str async def simple_chatbot_response(input: QuestionInput) -> Any: - results = await search_authorized(question=input.question, user=input.user) - # sources = "\n".join(set(result.payload['path'] for result in results)) - answer = generate.generate(input.question, results, opt_config) - # full_response = f"{answer}\n\n{sources}" - return answer # full_response + return itemgetter('answer')(await pipeline(inputs={ + 'question': input.question, + 'user': input.user, + })) example_query = "What approach did Arjun Singh's campaign use to respond to voters' concerns on social media platforms during the municipal elections in Delhi?" @@ -99,50 +98,19 @@ async def stream( async def event_stream(inputs: ChatState) -> AsyncGenerator[Any, Any]: - request_id = secrets.token_hex() try: question = inputs.messages[-1].content if not inputs.user: raise ValueError("User Missing") - results = await search_authorized(user=inputs.user, question=question, request_id=request_id) - # sources = "\n".join(set(result.payload['path'] for result in results)) - - executor = ThreadPoolExecutor() - loop = asyncio.get_event_loop() - - def sync_gen() -> Generator[str, Any, None]: - for chunk in generate.generate_stream(question, results, opt_config, request_id=request_id): - yield chunk - - chunks = await loop.run_in_executor(executor, lambda: list(sync_gen())) + answer = itemgetter('answer')(await pipeline(inputs={ + 'question': question, + 'user': inputs.user, + })) - yield f"data: {json.dumps({'choices': [{'delta': {}, 'finish_reason': None}]})}\n\n" - - for chunk in chunks: - msg = { - "choices": [ - { - "delta": {"content": chunk}, - "finish_reason": None - } - ] - } - yield f"data: {json.dumps(msg)}\n\n" - # await asyncio.sleep(0.1) # delay for stream check - - # msg = { - # "choices": [ - # { - # "delta": {"content": "\n\n" + sources}, - # "finish_reason": None - # } - # ] - # } - # yield f"data: {json.dumps(msg)}\n\n" - - yield f"data: {json.dumps({'choices': [{'delta': {}, 'finish_reason': 'stop'}]})}\n\n" + delta = {'content': answer} + yield f"data: {json.dumps({'choices': [{'delta': delta, 'finish_reason': 'stop'}]})}\n\n" except Exception as e: logging.error('%s: %s', e.__class__, e) content = 'There is a problem with Learn2RAG configuration. Please contact your administrator.' # FIXME diff --git a/learn2rag/pipeline/generate.py b/learn2rag/pipeline/generate.py index 64e3efc..e07a20b 100644 --- a/learn2rag/pipeline/generate.py +++ b/learn2rag/pipeline/generate.py @@ -5,8 +5,6 @@ from .llm import llm -profilingLogger = logging.getLogger('profiling') - context_template =""" ----- Source: {source} @@ -27,8 +25,7 @@ def generate(query: str, search_results: list[ScoredPoint], opt_config: dict[str return answer.content -def generate_stream(query: str, search_results: list[ScoredPoint], opt_config: dict[str, Any], request_id: str | None=None) -> Generator[str, None, None]: - profilingLogger.info('start', extra={'activity': 'generate', 'request_id': request_id}) +def generate_stream(query: str, search_results: list[ScoredPoint], opt_config: dict[str, Any]) -> Generator[str, None, None]: assert llm is not None if hasattr(search_results, "points"): @@ -44,5 +41,3 @@ def generate_stream(query: str, search_results: list[ScoredPoint], opt_config: d text_chunk = chunk.text() if text_chunk: yield text_chunk - - profilingLogger.info('end', extra={'activity': 'generate', 'request_id': request_id}) diff --git a/learn2rag/pipeline/main.py b/learn2rag/pipeline/main.py index aea86fd..51439b5 100755 --- a/learn2rag/pipeline/main.py +++ b/learn2rag/pipeline/main.py @@ -22,12 +22,12 @@ if opt_config["query_mode"] == "multi": # in query_mode 'multi' different querys for each vector in the multi-vector are allowed multi_query = {"content": "What is USM AI?", "title": "What is USM AI?", "summary": "What is USM AI?", "source_path":"USU/ITSM/"} - results = search.search_multi(multi_query, user_config, opt_config, request_id=None) + results = search.search_multi(multi_query, user_config, opt_config) # modify the query for generation part query = " ".join(f"{k}={v}" for k, v in multi_query.items()) else: query = "What is USM AI?" #"What approach did Arjun Singh's campaign use to respond to voters' concerns on social media platforms during the municipal elections in Delhi?" - results = search.search(query, user_config, opt_config, request_id=None) + results = search.search(query, user_config, opt_config) points = results.points diff --git a/learn2rag/pipeline/operators/__init__.py b/learn2rag/pipeline/operators/__init__.py new file mode 100644 index 0000000..8ca11e7 --- /dev/null +++ b/learn2rag/pipeline/operators/__init__.py @@ -0,0 +1,5 @@ +from .basic_pipeline import BasicPipeline + +__all__ = [ + 'BasicPipeline', +] diff --git a/learn2rag/pipeline/operators/base.py b/learn2rag/pipeline/operators/base.py new file mode 100644 index 0000000..f5507e6 --- /dev/null +++ b/learn2rag/pipeline/operators/base.py @@ -0,0 +1,30 @@ +import abc +import logging +import time +from typing import Any + +from ..prov import Activity, Prov + +profilingLogger = logging.getLogger('profiling') + + +class BaseOperator(abc.ABC): + async def __call__(self, inputs: Any, prov: Prov | None = None) -> Any: + if prov is None: + prov = Prov() + label = self.__class__.__name__ + startedAtTime = time.time() + profilingLogger.info('start', extra={'activity': label, 'request_id': prov.id}) + outputs = await self.run(inputs=inputs, prov=prov) + endedAtTime = time.time() + profilingLogger.info('end', extra={'activity': label, 'request_id': prov.id}) + prov.append(Activity( + label=label, + startedAtTime=startedAtTime, + endedAtTime=endedAtTime, + )) + return outputs + + @abc.abstractmethod + async def run(self, inputs: Any, prov: Prov) -> Any: + pass diff --git a/learn2rag/pipeline/operators/basic_pipeline.py b/learn2rag/pipeline/operators/basic_pipeline.py new file mode 100644 index 0000000..1999201 --- /dev/null +++ b/learn2rag/pipeline/operators/basic_pipeline.py @@ -0,0 +1,37 @@ +from operator import itemgetter +from typing import TypedDict + +from ..prov import Prov +from .base import BaseOperator +from .search import SearchOperator +from .generation import GenerationOperator + +Inputs = TypedDict('Inputs', { + 'question': str, + 'user': str | None, +}, total=True) + +Outputs = TypedDict('Outputs', { + 'answer': str, +}, total=True) + + +class BasicPipeline(BaseOperator): + async def run(self, inputs: Inputs, prov: Prov) -> Outputs: + documents = itemgetter('documents')(await SearchOperator()( + inputs={ + 'question': inputs['question'], + 'user': inputs['user'], + }, + prov=prov, + )) + answer = itemgetter('answer')(await GenerationOperator()( + inputs={ + 'question': inputs['question'], + 'documents': documents, + }, + prov=prov, + )) + return { + 'answer': answer, + } diff --git a/learn2rag/pipeline/operators/generation.py b/learn2rag/pipeline/operators/generation.py new file mode 100644 index 0000000..c90cb01 --- /dev/null +++ b/learn2rag/pipeline/operators/generation.py @@ -0,0 +1,23 @@ +from typing import Any, TypedDict + +from .base import BaseOperator +from ..prov import Prov +from ..config import opt_config +from ..generate import generate + +Inputs = TypedDict('Inputs', { + 'question': str, + 'documents': Any, +}, total=True) + +Outputs = TypedDict('Outputs', { + 'answer': str, +}, total=True) + + +class GenerationOperator(BaseOperator): + async def run(self, inputs: Inputs, prov: Prov) -> Outputs: + answer = generate(inputs['question'], inputs['documents'], opt_config) + return { + 'answer': answer, + } diff --git a/learn2rag/pipeline/operators/search.py b/learn2rag/pipeline/operators/search.py new file mode 100644 index 0000000..0cbafbb --- /dev/null +++ b/learn2rag/pipeline/operators/search.py @@ -0,0 +1,24 @@ +from concurrent.futures import ThreadPoolExecutor +from typing import Any, TypedDict +import asyncio + +from ..prov import Prov +from .base import BaseOperator +from ..search import search_authorized + +Inputs = TypedDict('Inputs', { + 'question': str, + 'user': str, +}, total=True) + +Outputs = TypedDict('Outputs', { + 'documents': Any, +}, total=True) + + +class SearchOperator(BaseOperator): + async def run(self, inputs: Inputs, prov: Prov) -> Outputs: + documents = await search_authorized(question=inputs['question'], user=inputs['user']) + return { + 'documents': documents, + } diff --git a/learn2rag/pipeline/prov.py b/learn2rag/pipeline/prov.py new file mode 100644 index 0000000..bec0dcf --- /dev/null +++ b/learn2rag/pipeline/prov.py @@ -0,0 +1,20 @@ +from dataclasses import dataclass, field +from secrets import token_hex +from typing import Any + + +@dataclass +class Activity(): + label: str + startedAtTime: float + endedAtTime: float + data: Any = field(default_factory=dict) + + +class Prov(): + def __init__(self) -> None: + self.id = token_hex() + self.items: list[Activity] = [] + + def append(self, activity: Activity) -> None: + self.items.append(activity) diff --git a/learn2rag/pipeline/search.py b/learn2rag/pipeline/search.py index fdc5682..a16dbcf 100644 --- a/learn2rag/pipeline/search.py +++ b/learn2rag/pipeline/search.py @@ -15,13 +15,8 @@ from .qdrant import Qdrant - -profilingLogger = logging.getLogger('profiling') - - # similarity search -def search(query: str, user_config: dict[str, Any], opt_config: dict[str, Any], *, request_id: str | None=None) -> QueryResponse: - profilingLogger.info('start', extra={'activity': 'search', 'request_id': request_id}) +def search(query: str, user_config: dict[str, Any], opt_config: dict[str, Any]) -> QueryResponse: collection_name = user_config["collection_name"] if opt_config["fusion_mode"] == "RRF": @@ -188,7 +183,7 @@ def search(query: str, user_config: dict[str, Any], opt_config: dict[str, Any], results.points = points return results -def search_multi(multi_query: dict[str, str], user_config: dict[str, Any], opt_config: dict[str, Any], request_id: str | None=None) -> QueryResponse: +def search_multi(multi_query: dict[str, str], user_config: dict[str, Any], opt_config: dict[str, Any]) -> QueryResponse: collection_name = user_config["collection_name"] # Init vector store @@ -219,10 +214,9 @@ def search_multi(multi_query: dict[str, str], user_config: dict[str, Any], opt_c using="multi", limit=opt_config["top_k"], ) - profilingLogger.info('end', extra={'activity': 'search', 'request_id': request_id}) return results -async def search_authorized(question: str, user: str, *, request_id: str|None=None) -> List[ScoredPoint]: - query_response = search(question, user_config, opt_config, request_id=request_id) +async def search_authorized(question: str, user: str) -> List[ScoredPoint]: + query_response = search(question, user_config, opt_config) authorized_points = await filter_authorized(user, query_response) return authorized_points From be38f1c37e03ac64c996cc72904ca4f3064e2c6f Mon Sep 17 00:00:00 2001 From: denkv Date: Tue, 28 Apr 2026 14:48:12 +0200 Subject: [PATCH 2/3] Use BasicPipeline in learn2rag/pipeline/main --- learn2rag/pipeline/main.py | 19 +++++++++++++------ .../pipeline/operators/basic_pipeline.py | 4 +++- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/learn2rag/pipeline/main.py b/learn2rag/pipeline/main.py index 51439b5..e40ae6f 100755 --- a/learn2rag/pipeline/main.py +++ b/learn2rag/pipeline/main.py @@ -1,15 +1,18 @@ +import asyncio import logging import logging.config import os import yaml import json +from operator import itemgetter from . import ingestion from . import search from . import generate +from .operators import BasicPipeline -if __name__ == "__main__": +async def main() -> None: try: logging.config.dictConfig(yaml.safe_load(open("logging.yaml").read())) except FileNotFoundError: @@ -25,19 +28,23 @@ results = search.search_multi(multi_query, user_config, opt_config) # modify the query for generation part query = " ".join(f"{k}={v}" for k, v in multi_query.items()) + points = results.points + answer = generate.generate(query, points, opt_config) else: + pipeline = BasicPipeline() query = "What is USM AI?" #"What approach did Arjun Singh's campaign use to respond to voters' concerns on social media platforms during the municipal elections in Delhi?" - results = search.search(query, user_config, opt_config) - - points = results.points + answer, points = itemgetter('answer', 'documents')(await pipeline( + inputs={'question': query, 'user': 'anonymous'}, + )) sources = "\n".join(set(point.payload['path'] for point in points)) # type: ignore[index] for point in points: print(f"ID: {point.id}, Path: {point.payload['path']}, Score: {point.score}") # type: ignore[index] - answer = generate.generate(query, points, opt_config) - print(query) print(answer) print(sources) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/learn2rag/pipeline/operators/basic_pipeline.py b/learn2rag/pipeline/operators/basic_pipeline.py index 1999201..abd049c 100644 --- a/learn2rag/pipeline/operators/basic_pipeline.py +++ b/learn2rag/pipeline/operators/basic_pipeline.py @@ -1,5 +1,5 @@ from operator import itemgetter -from typing import TypedDict +from typing import Any, TypedDict from ..prov import Prov from .base import BaseOperator @@ -13,6 +13,7 @@ Outputs = TypedDict('Outputs', { 'answer': str, + 'documents': Any, }, total=True) @@ -34,4 +35,5 @@ async def run(self, inputs: Inputs, prov: Prov) -> Outputs: )) return { 'answer': answer, + 'documents': documents, } From 7a1394614e9ad6eef6ee8f4271cbfd148f75a9d1 Mon Sep 17 00:00:00 2001 From: denkv Date: Mon, 15 Jun 2026 10:30:20 +0200 Subject: [PATCH 3/3] Add an example of using the recorded prov data --- learn2rag/pipeline/app.py | 3 +- learn2rag/pipeline/operators/__init__.py | 2 + learn2rag/pipeline/operators/example.py | 25 +++++++++++ .../pipeline/operators/example_pipeline.py | 41 +++++++++++++++++++ 4 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 learn2rag/pipeline/operators/example.py create mode 100644 learn2rag/pipeline/operators/example_pipeline.py diff --git a/learn2rag/pipeline/app.py b/learn2rag/pipeline/app.py index cd65c1e..0fa9148 100644 --- a/learn2rag/pipeline/app.py +++ b/learn2rag/pipeline/app.py @@ -20,8 +20,9 @@ from .qdrant import Qdrant from .search import search_authorized from .operators import BasicPipeline +from .operators.base import BaseOperator -pipeline = BasicPipeline() +pipeline: BaseOperator = BasicPipeline() class QuestionInput(BaseModel): diff --git a/learn2rag/pipeline/operators/__init__.py b/learn2rag/pipeline/operators/__init__.py index 8ca11e7..06a2cfa 100644 --- a/learn2rag/pipeline/operators/__init__.py +++ b/learn2rag/pipeline/operators/__init__.py @@ -1,5 +1,7 @@ from .basic_pipeline import BasicPipeline +from .example_pipeline import ExamplePipeline __all__ = [ 'BasicPipeline', + 'ExamplePipeline', ] diff --git a/learn2rag/pipeline/operators/example.py b/learn2rag/pipeline/operators/example.py new file mode 100644 index 0000000..f14cba3 --- /dev/null +++ b/learn2rag/pipeline/operators/example.py @@ -0,0 +1,25 @@ +from typing import TypedDict +import logging + +from ..prov import Prov +from .base import BaseOperator + +logger = logging.getLogger(__name__) + +Inputs = TypedDict('Inputs', { + 'question': str, +}, total=True) + +Outputs = TypedDict('Outputs', { + 'value': int, +}, total=True) + + +class ExampleOperator(BaseOperator): + async def run(self, inputs: Inputs, prov: Prov) -> Outputs: + logger.info('Amount of steps: %i', len(prov.items)) + logger.info('Last step: %s', prov.items[-1].label) + logger.info('Amount of search steps: %i', sum(1 for item in prov.items if item.label == 'SearchOperator')) + return { + 'value': 42, + } diff --git a/learn2rag/pipeline/operators/example_pipeline.py b/learn2rag/pipeline/operators/example_pipeline.py new file mode 100644 index 0000000..be315c3 --- /dev/null +++ b/learn2rag/pipeline/operators/example_pipeline.py @@ -0,0 +1,41 @@ +from operator import itemgetter +from typing import Any, TypedDict + +from ..prov import Prov +from .base import BaseOperator +from .example import ExampleOperator +from .search import SearchOperator +from .generation import GenerationOperator + +Inputs = TypedDict('Inputs', { + 'question': str, + 'user': str | None, +}, total=True) + +Outputs = TypedDict('Outputs', { + 'answer': str, + 'documents': Any, +}, total=True) + + +class ExamplePipeline(BaseOperator): + async def run(self, inputs: Inputs, prov: Prov) -> Outputs: + documents = itemgetter('documents')(await SearchOperator()( + inputs={ + 'question': inputs['question'], + 'user': inputs['user'], + }, + prov=prov, + )) + answer = itemgetter('answer')(await GenerationOperator()( + inputs={ + 'question': inputs['question'], + 'documents': documents, + }, + prov=prov, + )) + await ExampleOperator()(inputs={'question': inputs['question']}, prov=prov) + return { + 'answer': answer, + 'documents': documents, + }