Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions src/leettools/chat/_impl/duckdb/history_manager_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def _dict_to_chat_history(self, chat_dict: Dict[str, Any]) -> ChatHistory:
kb_id=chat_dict[ChatHistory.FIELD_KB_ID],
creator_id=chat_dict[ChatHistory.FIELD_CREATOR_ID],
article_type=ArticleType(chat_dict[ChatHistory.FIELD_ARTICLE_TYPE]),
flow_type=chat_dict[ChatHistory.FIELD_FLOW_TYPE],
description=chat_dict[ChatHistory.FIELD_DESCRIPTION],
share_to_public=chat_dict[ChatHistory.FIELD_SHARE_TO_PUBLIC],
org_id=chat_dict[ChatHistory.FIELD_ORG_ID],
Expand Down Expand Up @@ -545,23 +546,66 @@ def get_ch_entry(self, username: str, chat_id: str) -> Optional[ChatHistory]:
return None
return self._dict_to_chat_history(rtn_dict)

def get_ch_entries_by_username(self, username: str) -> List[ChatHistory]:
def get_ch_entries_by_username(
self,
username: str,
org: Optional[Org] = None,
kb: Optional[KnowledgeBase] = None,
article_type: Optional[ArticleType] = None,
flow_type: Optional[str] = None,
) -> List[ChatHistory]:
"""Get all chat history entries for a user."""
table_name = self._get_table_name_for_user(username)
where_clause = f"WHERE {ChatHistory.FIELD_CREATOR_ID} = ? ORDER BY {ChatHistory.FIELD_UPDATED_AT} DESC"
value_list = [username]
query = {ChatHistory.FIELD_CREATOR_ID: username}
if org is not None:
query[ChatHistory.FIELD_ORG_ID] = org.org_id
if kb is not None:
query[ChatHistory.FIELD_KB_ID] = kb.kb_id
if article_type is not None:
query[ChatHistory.FIELD_ARTICLE_TYPE] = article_type.value
if flow_type is not None:
query[ChatHistory.FIELD_FLOW_TYPE] = flow_type

condition_clause = " AND ".join([f"{k} = ?" for k in query.keys()])
where_clause = (
f"WHERE {condition_clause} ORDER BY {ChatHistory.FIELD_UPDATED_AT} DESC"
)
value_list = list(query.values())

rtn_dicts = self.duckdb_client.fetch_all_from_table(
table_name=table_name,
where_clause=where_clause,
value_list=value_list,
)

handle_legacy_date = False
if flow_type is not None and rtn_dicts == []:
query.pop(ChatHistory.FIELD_FLOW_TYPE)
condition_clause = " AND ".join([f"{k} = ?" for k in query.keys()])
where_clause = (
f"WHERE {condition_clause} ORDER BY {ChatHistory.FIELD_UPDATED_AT} DESC"
)
value_list = list(query.values())
rtn_dicts = self.duckdb_client.fetch_all_from_table(
table_name=table_name,
where_clause=where_clause,
value_list=value_list,
)
handle_legacy_date = True

rtn_ch_list: List[ChatHistory] = []
for rtn_dict in rtn_dicts:
if rtn_dict is None:
continue
ch = self._dict_to_chat_history(rtn_dict)
if ch is None:
continue
if handle_legacy_date:
if ch.flow_type is None:
if ch.metadata is not None:
ch.flow_type = ch.metadata.flow_type
if ch.flow_type != flow_type:
continue
rtn_ch_list.append(ch)
return rtn_ch_list

Expand Down
1 change: 1 addition & 0 deletions src/leettools/chat/chat_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def setup_exec_info(
creator_id=user.username,
description=f"Created by CLI command for query {query}",
article_type=flow.get_article_type(),
flow_type=flow_type,
)
)

Expand Down
25 changes: 22 additions & 3 deletions src/leettools/chat/history_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from leettools.common.logging import EventLogger
from leettools.common.singleton_meta import SingletonMeta
from leettools.common.utils import content_utils
from leettools.common.utils.deprecatied import deprecated
from leettools.context_manager import Context
from leettools.core.consts.article_type import ArticleType
from leettools.core.schemas.chat_query_item import ChatQueryItem, ChatQueryItemCreate
Expand Down Expand Up @@ -168,17 +169,34 @@ def delete_ch_entry_item(self, username: str, chat_id: str, query_id: str) -> No
@abstractmethod
def get_ch_entry(self, username: str, chat_id: str) -> Optional[ChatHistory]:
"""
Gets a knowledge base entry by its ID.
Gets a chat history entry by its ID.
"""
pass

@abstractmethod
def get_ch_entries_by_username(self, username: str) -> List[ChatHistory]:
def get_ch_entries_by_username(
self,
username: str,
org: Optional[Org] = None,
kb: Optional[KnowledgeBase] = None,
article_type: Optional[ArticleType] = None,
flow_type: Optional[str] = None,
) -> List[ChatHistory]:
"""
Gets all knowledge base entries given a username
Gets all knowledge base entries given a username.

- username: The username to get the chat history entries from.
- org: The organization to get the chat history entries from, None means default org.
- kb: The knowledge base to get the chat history entries from, None if all kbs.
- article_type: The article type to filter by. If None, all article types are returned.
- flow_type: The flow type to filter by. If None, all flow types are returned.

Returns:
A list of chat history entries.
"""
pass

@deprecated(reason="Use get_ch_entries_by_username instead")
@abstractmethod
def get_ch_entries_by_username_with_type(
self, username: str, article_type: str
Expand All @@ -188,6 +206,7 @@ def get_ch_entries_by_username_with_type(
"""
pass

@deprecated(reason="Use get_ch_entries_by_username instead")
@abstractmethod
def get_ch_entries_by_username_with_type_in_kb(
self,
Expand Down
12 changes: 11 additions & 1 deletion src/leettools/chat/schemas/chat_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class CHBase(BaseModel):
share_to_public: Optional[bool] = Field(
False, description="Whether the chat is shared to the public."
)
flow_type: Optional[str] = Field(None, description="The flow type used in the chat")


class CHCreate(CHBase):
Expand Down Expand Up @@ -94,6 +95,7 @@ def from_ch_create(CHInDB, ch_create: CHCreate) -> "CHInDB":
kb_id=ch_create.kb_id,
description=ch_create.description,
article_type=ch_create.article_type,
flow_type=ch_create.flow_type,
chat_id=str(uuid.uuid4()),
created_at=ct,
updated_at=ct,
Expand All @@ -118,7 +120,6 @@ class ChatHistory(CHInDB):
and etc.
"""

#
kb_name: Optional[str] = Field(
None, description="For adhoc chat, we need to return the kb_name created."
)
Expand Down Expand Up @@ -150,13 +151,21 @@ def get_history_str(self, ignore_last: bool = False) -> str:
def from_ch_in_db(ChatHistory, ch_in_db: CHInDB) -> "ChatHistory":
# we need to assignt attributes with non-None values
# also complext objects that we do not want to deep-copy
if ch_in_db.flow_type is None:
if ch_in_db.metadata is not None:
flow_type = ch_in_db.metadata.flow_type
else:
flow_type = None
else:
flow_type = ch_in_db.flow_type
ch = ChatHistory(
name=ch_in_db.name,
org_id=ch_in_db.org_id,
kb_id=ch_in_db.kb_id,
creator_id=ch_in_db.creator_id,
article_type=ch_in_db.article_type,
share_to_public=ch_in_db.share_to_public,
flow_type=flow_type,
queryies=ch_in_db.queries,
answers=ch_in_db.answers,
metadata=ch_in_db.metadata,
Expand Down Expand Up @@ -186,6 +195,7 @@ def get_base_columns(cls) -> Dict[str, str]:
ChatHistory.FIELD_KB_ID: "VARCHAR",
ChatHistory.FIELD_CREATOR_ID: "VARCHAR",
ChatHistory.FIELD_ARTICLE_TYPE: "VARCHAR",
ChatHistory.FIELD_FLOW_TYPE: "VARCHAR",
ChatHistory.FIELD_DESCRIPTION: "TEXT",
ChatHistory.FIELD_SHARE_TO_PUBLIC: "BOOLEAN DEFAULT FALSE",
ChatHistory.FIELD_ORG_ID: "VARCHAR",
Expand Down
26 changes: 26 additions & 0 deletions src/leettools/common/utils/deprecatied.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import functools
import warnings


def deprecated(reason: str):
"""
A basic decorator to mark functions as deprecated.
"""

def decorator(func):
@functools.wraps(func) # Preserves original function metadata
def wrapper(*args, **kwargs):
warnings.warn(
f"{func.__name__}() is deprecated: {reason}",
category=DeprecationWarning,
stacklevel=2, # Points to the caller of the decorated function
)
return func(*args, **kwargs)

# Add note to docstring if possible
docstring = func.__doc__ if func.__doc__ else ""
reason_note = f"\n\n.. deprecated::\n {reason}"
wrapper.__doc__ = docstring + reason_note
return wrapper

return decorator
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def _update_settings_for_user(
else:
cur = self._dict_to_user_settings(result)

# first update the settings with the default values
for key, item in default.items():
if key not in cur.settings:
if key not in update.settings:
Expand All @@ -182,6 +183,13 @@ def _update_settings_for_user(
# the key already exists and the update does not contain the key
pass

# Now update the settings with the new values
for key, item in update.settings.items():
if key not in cur.settings:
cur.settings[key] = item
else:
cur.settings[key].value = item.value

return self._update_user_settings(cur)

def _user_settings_to_dict(self, user_settings: UserSettings) -> Dict[str, Any]:
Expand Down
25 changes: 14 additions & 11 deletions src/leettools/eds/scheduler/_impl/task_scanner_kb.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def _process_docsource(
f"Found new doc source tasks {len(new_doc_source_tasks)}: {dssig}"
)
else:
self.logger.debug(f"No new doc source tasks: {dssig}")
self.logger.noop(f"No new doc source tasks: {dssig}", noop_lvl=3)

new_docsink_tasks = []
docsinks = self.docsink_store.get_docsinks_for_docsource(org, kb, docsource)
Expand All @@ -176,7 +176,7 @@ def _process_docsource(
f"Found new docsink tasks {len(new_docsink_tasks)}: {dssig}"
)
else:
self.logger.debug(f"No new docsink tasks: {dssig}")
self.logger.noop(f"No new docsink tasks: {dssig}", noop_lvl=3)

new_split_tasks = []
documents = self.document_store.get_documents_for_docsource(org, kb, docsource)
Expand All @@ -192,7 +192,7 @@ def _process_docsource(
if new_split_tasks:
self.logger.debug(f"Found new split tasks {len(new_split_tasks)}: {dssig}")
else:
self.logger.debug(f"No new split tasks: {dssig}")
self.logger.noop(f"No new split tasks: {dssig}", noop_lvl=3)

new_embed_tasks = []
for doc in documents:
Expand All @@ -209,7 +209,7 @@ def _process_docsource(
if new_embed_tasks:
self.logger.debug(f"Found new embed tasks {len(new_embed_tasks)}: {dssig}")
else:
self.logger.debug(f"No new embed tasks: {dssig}")
self.logger.noop(f"No new embed tasks: {dssig}", noop_lvl=3)

all_new_tasks = (
new_doc_source_tasks + new_docsink_tasks + new_split_tasks + new_embed_tasks
Expand Down Expand Up @@ -538,8 +538,9 @@ def _need_to_check_docsource() -> bool:

# now the docsource is finished
if _docsource_in_cur_tasks(org, kb, docsource):
self.logger.debug(
f"Found finished docsource with unfinished tasks: {dssig}"
self.logger.noop(
f"Found finished docsource with unfinished tasks: {dssig}",
noop_lvl=1,
)
return True
else:
Expand All @@ -557,8 +558,9 @@ def _need_to_check_docsource() -> bool:

try:
all_new_tasks = self._process_docsource(org, kb, docsource)
self.logger.debug(
f"{len(all_new_tasks)} new tasks for docsource: {dssig}"
self.logger.noop(
f"{len(all_new_tasks)} new tasks for docsource: {dssig}",
noop_lvl=2,
)
if len(all_new_tasks) > 0:
new_tasks += all_new_tasks
Expand All @@ -568,8 +570,9 @@ def _need_to_check_docsource() -> bool:
if not docsource.is_finished():
self._update_docsource_status(org, kb, docsource)
else:
self.logger.debug(
f"DocSource is already marked as {docsource.docsource_status}: {dssig}"
self.logger.noop(
f"DocSource is already marked as {docsource.docsource_status}: {dssig}",
noop_lvl=2,
)
# last_scan_time is intended to compare the updated_at time of the docsource
# but right now the updated_at time is not updated when the status is changed
Expand All @@ -583,5 +586,5 @@ def _need_to_check_docsource() -> bool:
)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
self.logger.noop(f"Task scanning took {elapsed_time:.6f} seconds.", noop_lvl=2)
self.logger.noop(f"Task scanning took {elapsed_time:.6f} seconds.", noop_lvl=1)
return new_tasks
Loading
Loading