From a352c0d36137a44a17f02488fe8f60f6372a19b2 Mon Sep 17 00:00:00 2001 From: Feng Peng Date: Tue, 1 Apr 2025 10:40:58 -0700 Subject: [PATCH] Fix the log output display (#1630) (#1645) * Make the log display better. * Fix topic list log output. --------- Co-authored-by: Feng Peng --- .../_impl/duckdb/history_manager_duckdb.py | 29 ++++++++++--- src/leettools/cli/llm/llm_inference.py | 2 +- src/leettools/flow/iterators/extract_kb.py | 4 +- src/leettools/flow/iterators/summarize.py | 2 +- src/leettools/flow/steps/step_extract_info.py | 2 +- src/leettools/flow/steps/step_gen_intro.py | 2 +- .../flow/steps/step_gen_search_phrases.py | 2 +- src/leettools/flow/steps/step_gen_section.py | 2 +- src/leettools/flow/steps/step_inference.py | 2 +- src/leettools/flow/steps/step_intention.py | 8 ++-- src/leettools/flow/steps/step_plan_topic.py | 10 +++-- .../flow/steps/step_query_rewrite.py | 6 +-- src/leettools/flow/steps/step_rerank.py | 2 +- src/leettools/flow/steps/step_scrape_urls.py | 2 +- .../flow/steps/step_search_medium.py | 11 +++-- .../flow/steps/step_search_to_docsource.py | 6 +-- src/leettools/flow/steps/step_summarize.py | 2 +- .../flow/steps/step_vectdb_search.py | 2 +- src/leettools/settings.py | 5 +++ .../svc/api/v1/routers/chat_router.py | 42 +++++++++++++++---- src/leettools/web/web_searcher.py | 12 ++++-- 21 files changed, 107 insertions(+), 48 deletions(-) diff --git a/src/leettools/chat/_impl/duckdb/history_manager_duckdb.py b/src/leettools/chat/_impl/duckdb/history_manager_duckdb.py index cadd683..9261fee 100644 --- a/src/leettools/chat/_impl/duckdb/history_manager_duckdb.py +++ b/src/leettools/chat/_impl/duckdb/history_manager_duckdb.py @@ -838,7 +838,9 @@ def run_query_item( query_id=chat_query_item.query_id, ) try: - query_logger.info(f"[Status]Query started: {chat_query_item.query_content}") + query_logger.info( + f"[Status] Query started: {chat_query_item.query_content}" + ) chat_query_result_create: ChatQueryResultCreate = ( self._execute_flow_for_query( org=org, @@ -849,7 +851,7 @@ def run_query_item( ) ) if chat_query_result_create is not None: - query_logger.info("[Status]Saving results.") + query_logger.info("[Status] Saving results.") chat_query_result = self._add_answers_to_chat( org=org, kb=kb, @@ -857,17 +859,34 @@ def run_query_item( chat_query_item=chat_query_item, chat_query_result_create=chat_query_result_create, ) - query_logger.info("[Status]Query completed.") + query_logger.info("[Status] Query completed.") self._update_kb_timestamp(org, kb) return chat_query_result else: # chat_query_result_create is None - query_logger.info("[Status]Query failed or not completed.") + query_logger.info("[Status] Query failed or not completed.") return None finally: end_time = time.perf_counter() elapsed_time = end_time - start_time - query_logger.info(f"[Query Runtime]{elapsed_time} seconds.") + + # Convert to minutes, seconds, milliseconds + minutes = int(elapsed_time // 60) + seconds = int(elapsed_time % 60) + milliseconds = int((elapsed_time * 1000) % 1000) + + runtime_parts = [] + if minutes > 0: + runtime_parts.append(f"{minutes} minutes") + if seconds > 0: + runtime_parts.append(f"{seconds} seconds") + if ( + milliseconds > 0 or not runtime_parts + ): # Show ms if no other units or has ms + runtime_parts.append(f"{milliseconds} milliseconds") + + runtime_str = " ".join(runtime_parts) + query_logger.info(f"[Runtime] {runtime_str}") remove_logger(logger_name) def update_ch_entry(self, ch_update: CHUpdate) -> Optional[ChatHistory]: diff --git a/src/leettools/cli/llm/llm_inference.py b/src/leettools/cli/llm/llm_inference.py index 909f1e2..0c3b906 100644 --- a/src/leettools/cli/llm/llm_inference.py +++ b/src/leettools/cli/llm/llm_inference.py @@ -82,7 +82,7 @@ def inference_func( if inference_section is None: raise exceptions.UnexpectedCaseException("Inference section is None.") - display_logger.info(f"[Status]Running inference for query {query}.") + display_logger.info(f"[Status] Running inference for query {query}.") llm_cli_tool = LLMCliTool(context, user, inference_section, display_logger) diff --git a/src/leettools/flow/iterators/extract_kb.py b/src/leettools/flow/iterators/extract_kb.py index 36c5718..8c0124a 100644 --- a/src/leettools/flow/iterators/extract_kb.py +++ b/src/leettools/flow/iterators/extract_kb.py @@ -96,7 +96,7 @@ def run( display_logger = exec_info.display_logger display_logger.info( - "[Status]Extracting information from documents in the knowledgebase ..." + "[Status] Extracting information from documents in the knowledgebase ..." ) if save_to_backend: @@ -145,7 +145,7 @@ def run( ) display_logger.info( - f"[Status]ExtractKB from document {document.original_uri} ..." + f"[Status] ExtractKB from document {document.original_uri} ..." ) extracted_obj_list = steps.StepExtractInfo.run_step( exec_info=exec_info, diff --git a/src/leettools/flow/iterators/summarize.py b/src/leettools/flow/iterators/summarize.py index 873bafb..9636a6c 100644 --- a/src/leettools/flow/iterators/summarize.py +++ b/src/leettools/flow/iterators/summarize.py @@ -59,7 +59,7 @@ def run( url. """ display_logger = exec_info.display_logger - display_logger.info("[Status]Summarizing documents for metadata ...") + display_logger.info("[Status] Summarizing documents for metadata ...") successful_documents: Dict[str, Document] = {} diff --git a/src/leettools/flow/steps/step_extract_info.py b/src/leettools/flow/steps/step_extract_info.py index 17411e2..60655d0 100644 --- a/src/leettools/flow/steps/step_extract_info.py +++ b/src/leettools/flow/steps/step_extract_info.py @@ -97,7 +97,7 @@ def run_step( """ display_logger = exec_info.display_logger display_logger.info( - f"[Status]StepExtractInfo: extract {model_class_name} from content." + f"[Status] StepExtractInfo: extract {model_class_name} from content." ) flow_options = exec_info.flow_options diff --git a/src/leettools/flow/steps/step_gen_intro.py b/src/leettools/flow/steps/step_gen_intro.py index d363986..a7f6a00 100644 --- a/src/leettools/flow/steps/step_gen_intro.py +++ b/src/leettools/flow/steps/step_gen_intro.py @@ -56,7 +56,7 @@ def _step_gen_intro_section( ) -> ArticleSection: display_logger = exec_info.display_logger - display_logger.info("[Status]Generating introduction.") + display_logger.info("[Status] Generating introduction.") query = exec_info.target_chat_query_item.query_content diff --git a/src/leettools/flow/steps/step_gen_search_phrases.py b/src/leettools/flow/steps/step_gen_search_phrases.py index 90e834c..b806edc 100644 --- a/src/leettools/flow/steps/step_gen_search_phrases.py +++ b/src/leettools/flow/steps/step_gen_search_phrases.py @@ -83,7 +83,7 @@ def run_step( query = exec_info.target_chat_query_item.query_content display_logger = exec_info.display_logger - display_logger.info("[Status]Generating web search phrases.") + display_logger.info("[Status] Generating web search phrases.") search_lang = flow_utils.get_search_lang( exec_info=exec_info, query_metadata=query_metadata diff --git a/src/leettools/flow/steps/step_gen_section.py b/src/leettools/flow/steps/step_gen_section.py index de9b377..c1b9c65 100644 --- a/src/leettools/flow/steps/step_gen_section.py +++ b/src/leettools/flow/steps/step_gen_section.py @@ -68,7 +68,7 @@ def run_step( - The generated section. """ display_logger = exec_info.display_logger - display_logger.info(f"[Status]Generating section {section_plan.title}.") + display_logger.info(f"[Status] Generating section {section_plan.title}.") api_caller = exec_info.get_inference_caller() diff --git a/src/leettools/flow/steps/step_inference.py b/src/leettools/flow/steps/step_inference.py index 7195332..4f605f8 100644 --- a/src/leettools/flow/steps/step_inference.py +++ b/src/leettools/flow/steps/step_inference.py @@ -59,7 +59,7 @@ def run_step( if inference_section is None: raise UnexpectedCaseException("Inference section is None.") - display_logger.info(f"[Status]Running inference for query {query}.") + display_logger.info(f"[Status] Running inference for query {query}.") inference = get_inference_by_strategy( context, user, inference_section, display_logger diff --git a/src/leettools/flow/steps/step_intention.py b/src/leettools/flow/steps/step_intention.py index ff9baa1..1f040ee 100644 --- a/src/leettools/flow/steps/step_intention.py +++ b/src/leettools/flow/steps/step_intention.py @@ -48,7 +48,7 @@ def _run_intention( query_metadata = ChatQueryMetadata(intention=DEFAULT_INTENTION) if intention_section is None: display_logger.info( - "Intention section is not provided. Using the default intention." + "[Update] Intention section is not provided. Using the default intention." ) return query_metadata @@ -62,7 +62,7 @@ def _run_intention( ) return query_metadata - display_logger.info("[Status]Getting intention for the query.") + display_logger.info("[Status] Getting intention for the query.") intention_getter = get_intention_getter_by_strategy( context=context, @@ -71,5 +71,7 @@ def _run_intention( display_logger=display_logger, ) query_metadata = intention_getter.get_intention(query) - display_logger.info(f"The intention for original query is: {query_metadata}") + display_logger.info( + f"[Update] The intention for original query is: {query_metadata}" + ) return query_metadata diff --git a/src/leettools/flow/steps/step_plan_topic.py b/src/leettools/flow/steps/step_plan_topic.py index af387ee..b725445 100644 --- a/src/leettools/flow/steps/step_plan_topic.py +++ b/src/leettools/flow/steps/step_plan_topic.py @@ -2,6 +2,7 @@ from typing import ClassVar, Dict, List, Optional, Type from leettools.common import exceptions +from leettools.common.logging.event_logger import EventLogger from leettools.common.utils import config_utils, template_eval from leettools.core.consts import flow_option from leettools.core.schemas.chat_query_metadata import ChatQueryMetadata @@ -148,7 +149,7 @@ def _step_plan_topic_for_style( - The list of topics. """ display_logger = exec_info.display_logger - display_logger.info("[Status]Planning topics for research article.") + display_logger.info("[Status] Planning topics for research article.") if num_of_sections is None or num_of_sections == 0: num_of_section_instruction = "generate a list of most relevant topics" @@ -238,7 +239,7 @@ def _step_plan_topic_for_style( override_model_name=planning_model, ) - return _parse_topic_list(response_str) + return _parse_topic_list(response_str, display_logger) except Exception as e: display_logger.error(f"Failed to generate topic list: {e}") if response_str is not None: @@ -248,7 +249,7 @@ def _step_plan_topic_for_style( ) -def _parse_topic_list(response_str: str) -> TopicList: +def _parse_topic_list(response_str: str, display_logger: EventLogger) -> TopicList: """Parse a string response into a TopicList object. Args: @@ -303,4 +304,7 @@ def _parse_topic_list(response_str: str) -> TopicList: ) topic_list = TopicList.model_validate(final_obj) + display_logger.info( + f"[Update] The topic list is:\n{json.dumps(final_obj, indent=2)}" + ) return topic_list diff --git a/src/leettools/flow/steps/step_query_rewrite.py b/src/leettools/flow/steps/step_query_rewrite.py index 93402d9..c4c0d6e 100644 --- a/src/leettools/flow/steps/step_query_rewrite.py +++ b/src/leettools/flow/steps/step_query_rewrite.py @@ -33,7 +33,7 @@ def run_step( """ display_logger = exec_info.display_logger display_logger.info( - f"[Status]Rewrite query: {exec_info.target_chat_query_item.query_content}" + f"[Status] Rewrite query: {exec_info.target_chat_query_item.query_content}" ) rewrite_section = exec_info.strategy.strategy_sections.get( StrategySectionName.REWRITE, None @@ -69,7 +69,7 @@ def _step_run_rewriter( display_logger.info("Rewrite strategy is disabled. Skip rewriting.") return Rewrite(rewritten_question=rewritten_query) - display_logger.info("[Status]Rewriting the query.") + display_logger.info("[Status] Rewriting the query.") query_rewriter = get_query_rewriter_by_strategy( context=context, @@ -83,5 +83,5 @@ def _step_run_rewriter( query_item=query_item, query_metadata=query_metadata, ) - display_logger.info(f"Rewritten result is: {rewrite}") + display_logger.info(f"[Update] Rewritten result is: {rewrite}") return rewrite diff --git a/src/leettools/flow/steps/step_rerank.py b/src/leettools/flow/steps/step_rerank.py index 89f67a4..812c36a 100644 --- a/src/leettools/flow/steps/step_rerank.py +++ b/src/leettools/flow/steps/step_rerank.py @@ -61,7 +61,7 @@ def _run_rerank( display_logger.info(f"Rerank is disabled. Skip reranking.") return top_ranked_result_segments - display_logger.info("[Status]Rerank the search results.") + display_logger.info("[Status] Rerank the search results.") try: original_top_ranked_result_segments = top_ranked_result_segments.copy() reranker = create_reranker_by_strategy( diff --git a/src/leettools/flow/steps/step_scrape_urls.py b/src/leettools/flow/steps/step_scrape_urls.py index ab0d1a6..95181f6 100644 --- a/src/leettools/flow/steps/step_scrape_urls.py +++ b/src/leettools/flow/steps/step_scrape_urls.py @@ -67,7 +67,7 @@ def run_step( kb = exec_info.kb query = exec_info.target_chat_query_item.query_content - display_logger.info(f"[Status]Scraping {len(links)} URLs.") + display_logger.info(f"[Status] Scraping {len(links)} URLs.") docsink_create_list = web_searcher.scrape_urls_to_docsinks( query=query, diff --git a/src/leettools/flow/steps/step_search_medium.py b/src/leettools/flow/steps/step_search_medium.py index c8da6e3..c89bd0a 100644 --- a/src/leettools/flow/steps/step_search_medium.py +++ b/src/leettools/flow/steps/step_search_medium.py @@ -1,12 +1,11 @@ import json -import requests - -from datetime import datetime, timezone - from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timezone from functools import partial from typing import ClassVar, Dict, List, Optional, Type +import requests + from leettools.common import exceptions from leettools.common.logging import EventLogger, logger from leettools.flow import flow_option_items @@ -15,8 +14,8 @@ from leettools.flow.flow_option_items import FlowOptionItem from leettools.flow.schemas.medium_article import MediumArticle from leettools.flow.step import AbstractStep -from leettools.web.web_searcher import WebSearcher from leettools.web.web_scraper import WebScraper +from leettools.web.web_searcher import WebSearcher ID_ATTR = "id" SNIPPET_ATTR = "body" @@ -78,7 +77,7 @@ def run_step( if search_keywords is None: search_keywords = exec_info.target_chat_query_item.query_content - display_logger.info("[Status]Start the medium search pipeline ...") + display_logger.info("[Status] Start the medium search pipeline ...") medium_articles = _run_medium_search_pipeline(exec_info, search_keywords) display_logger.info( f"Successfully find {len(medium_articles)} " diff --git a/src/leettools/flow/steps/step_search_to_docsource.py b/src/leettools/flow/steps/step_search_to_docsource.py index 2aaaac5..bc3bdf3 100644 --- a/src/leettools/flow/steps/step_search_to_docsource.py +++ b/src/leettools/flow/steps/step_search_to_docsource.py @@ -136,7 +136,7 @@ def run_step( assert len(updated_docsources) == 1 return updated_docsources[0] - display_logger.info("[Status]Start the search to docsource pipeline ...") + display_logger.info("[Status] Start the search to docsource pipeline ...") try: # if the kb.auto_schedule is False, we should run the process manually success_documents = _run_web_search_pipeline( @@ -145,7 +145,7 @@ def run_step( search_keywords=search_keywords, ) display_logger.info( - f"Successfully ingested {len(success_documents)} documents from search." + f"[Update] Successfully ingested {len(success_documents)} documents from search." ) return docsource except Exception as e: @@ -232,7 +232,7 @@ def _create_docsrc_for_search( display_logger=display_logger, ) - display_logger.info(f"[Status]Searching the web with {retriever_type} ...") + display_logger.info(f"[Status] Searching the web with {retriever_type} ...") # use yyyy-mm-dd-hh-mm-ss to the URI to distinguish different searches timestamp = time_utils.current_datetime().strftime("%Y-%m-%d-%H-%M-%S") diff --git a/src/leettools/flow/steps/step_summarize.py b/src/leettools/flow/steps/step_summarize.py index 4ad27fc..47e167f 100644 --- a/src/leettools/flow/steps/step_summarize.py +++ b/src/leettools/flow/steps/step_summarize.py @@ -119,7 +119,7 @@ def run_step( context = exec_info.context document_store = context.get_repo_manager().get_document_store() - display_logger.info(f"[Status]Summarizing document {document.original_uri}.") + display_logger.info(f"[Status] Summarizing document {document.original_uri}.") if document.embed_status != DocSourceStatus.COMPLETED: display_logger.info( diff --git a/src/leettools/flow/steps/step_vectdb_search.py b/src/leettools/flow/steps/step_vectdb_search.py index b9c815a..d9211ad 100644 --- a/src/leettools/flow/steps/step_vectdb_search.py +++ b/src/leettools/flow/steps/step_vectdb_search.py @@ -51,7 +51,7 @@ def run_step( flow_options = query_options.flow_options query = exec_info.target_chat_query_item.query_content - display_logger.info(f"[Status]Search in KB {kb.name} for related segments.") + display_logger.info(f"[Status] Search in KB {kb.name} for related segments.") search_section = exec_info.strategy.strategy_sections.get( StrategySectionName.SEARCH, None diff --git a/src/leettools/settings.py b/src/leettools/settings.py index 563c246..607def3 100644 --- a/src/leettools/settings.py +++ b/src/leettools/settings.py @@ -385,6 +385,11 @@ class SystemSettings(BaseModel): description="The default batch size for inserting embeddings into the database", ) + display_log_status_flag: str = Field( + "status,thinking,update,runtime", + description="The default log status flag for the system", + ) + DEFAULT_FLOW_TYPE: str = Field( "answer", description="The default flow type for the system" ) diff --git a/src/leettools/svc/api/v1/routers/chat_router.py b/src/leettools/svc/api/v1/routers/chat_router.py index 48fef47..eaeeda0 100644 --- a/src/leettools/svc/api/v1/routers/chat_router.py +++ b/src/leettools/svc/api/v1/routers/chat_router.py @@ -1,5 +1,6 @@ import os -from typing import List, Optional, Tuple +import re +from typing import List, Optional, Set, Tuple import aiofiles from fastapi import Depends, HTTPException @@ -142,18 +143,41 @@ def __init__(self, *args, **kwargs): self.user_settings_store = context.get_user_settings_store() self.flow_manager = FlowManager(context.settings) - async def read_log_file(file_path: str): - async with aiofiles.open(file_path, mode="rb") as file: - while True: - chunk = await file.read(4096) # Read in chunks of 4KB - if not chunk: - break - yield chunk + # Status types to filter log messages + if self.settings.display_log_status_flag: + # convert the comma separated string to a set + status_filter: Set[str] = set( + self.settings.display_log_status_flag.split(",") + ) + else: + status_filter: Set[str] = { + "status", + "thinking", + } + + async def read_log_file(file_path: str, full_log: bool): + async with aiofiles.open(file_path, mode="r") as file: + async for line in file: + if full_log: + yield line.encode() + else: + # Make the regex more flexible + match = re.match( + r"^\[(\d{2}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2}\.\d{3})\]\s+(\S+)\s+\[(.*?)\]\s+(.*)$", + line, + ) + if match: + log_level = match.group(2) + status_type = match.group(3) + message = match.group(4) + if status_type.lower() in status_filter: + yield line.encode() @self.get("/stream_logs/{chat_id}/{query_id}") async def stream_log( chat_id: str, query_id: str, + full_log: Optional[bool] = False, calling_user: User = Depends(self.auth.get_user_from_request), ) -> StreamingResponse: log_location = LogLocator.get_log_dir_for_query( @@ -164,7 +188,7 @@ async def stream_log( if not os.path.isfile(log_path): raise FileNotFoundError(f"Log file {log_path} not found.") return StreamingResponse( - read_log_file(log_path), media_type="text/plain" + read_log_file(log_path, full_log), media_type="text/plain" ) except FileNotFoundError: raise HTTPException( diff --git a/src/leettools/web/web_searcher.py b/src/leettools/web/web_searcher.py index 106d9a5..a66b5c6 100644 --- a/src/leettools/web/web_searcher.py +++ b/src/leettools/web/web_searcher.py @@ -185,7 +185,9 @@ def create_docsinks_by_search_and_scrape( ) if len(search_results) == 0: - display_logger.info(f"No search results found for query {search_keywords}.") + display_logger.info( + f"[Update] No search results found for query {search_keywords}." + ) return [] new_search_urls = self._get_new_urls( @@ -201,7 +203,9 @@ def create_docsinks_by_search_and_scrape( display_logger=display_logger, ) - display_logger.info(f"Found {len(docsink_create_list)} docsinks to be created.") + display_logger.info( + f"[Update] Found {len(docsink_create_list)} docsinks to be created." + ) return docsink_create_list @@ -234,7 +238,9 @@ def scrape_urls_to_docsinks( scraper = WebScraper(context=self.context, display_logger=display_logger) scrape_results = scraper.scrape_urls_to_file(links) - display_logger.info(f"Scraped {len(scrape_results)} results for {query}") + display_logger.info( + f"[Update] Scraped {len(scrape_results)} results for {query}" + ) docsink_create_list = _get_docsink_create_from_saved_files( kb=kb, docsource=docsource, scrape_results=scrape_results