diff --git a/examples/mem_scheduler/task_stop_rerun.py b/examples/mem_scheduler/task_stop_rerun.py index 5bd34465..db8dd880 100644 --- a/examples/mem_scheduler/task_stop_rerun.py +++ b/examples/mem_scheduler/task_stop_rerun.py @@ -1,7 +1,6 @@ from pathlib import Path from time import sleep -# Note: we skip API handler status/wait utilities in this demo from memos.api.routers.server_router import mem_scheduler from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem diff --git a/examples/mem_scheduler/try_schedule_modules.py b/examples/mem_scheduler/try_schedule_modules.py index 4ffa6557..b7347ae1 100644 --- a/examples/mem_scheduler/try_schedule_modules.py +++ b/examples/mem_scheduler/try_schedule_modules.py @@ -1,4 +1,3 @@ -import shutil import sys from pathlib import Path @@ -7,16 +6,15 @@ from tqdm import tqdm -from memos.configs.mem_cube import GeneralMemCubeConfig -from memos.configs.mem_os import MOSConfig -from memos.configs.mem_scheduler import AuthConfig -from memos.log import get_logger -from memos.mem_cube.general import GeneralMemCube -from memos.mem_scheduler.analyzer.mos_for_test_scheduler import MOSForTestScheduler -from memos.mem_scheduler.general_scheduler import GeneralScheduler -from memos.mem_scheduler.schemas.task_schemas import ( - NOT_APPLICABLE_TYPE, +from memos.api.routers.server_router import ( + mem_scheduler, ) +from memos.log import get_logger +from memos.mem_scheduler.analyzer.api_analyzer import DirectSearchMemoriesAnalyzer +from memos.mem_scheduler.base_scheduler import BaseScheduler +from memos.mem_scheduler.optimized_scheduler import OptimizedScheduler +from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem +from memos.mem_scheduler.schemas.task_schemas import MEM_UPDATE_TASK_LABEL if TYPE_CHECKING: @@ -95,7 +93,7 @@ def init_task(): return conversations, questions -def show_web_logs(mem_scheduler: GeneralScheduler): +def show_web_logs(mem_scheduler: BaseScheduler): """Display all web log entries from the scheduler's log queue. Args: @@ -130,78 +128,77 @@ def show_web_logs(mem_scheduler: GeneralScheduler): print("=" * 110 + "\n") -if __name__ == "__main__": - # set up data - conversations, questions = init_task() - - # set configs - mos_config = MOSConfig.from_yaml_file( - f"{BASE_DIR}/examples/data/config/mem_scheduler/memos_config_w_scheduler.yaml" - ) - - mem_cube_config = GeneralMemCubeConfig.from_yaml_file( - f"{BASE_DIR}/examples/data/config/mem_scheduler/mem_cube_config_neo4j.yaml" - ) +class ScheduleModulesRunner(DirectSearchMemoriesAnalyzer): + def __init__(self): + super().__init__() - # default local graphdb uri - if AuthConfig.default_config_exists(): - auth_config = AuthConfig.from_local_config() + def start_conversation(self, user_id="test_user", mem_cube_id="test_cube", session_id=None): + self.current_user_id = user_id + self.current_mem_cube_id = mem_cube_id + self.current_session_id = ( + session_id or f"session_{hash(user_id + mem_cube_id)}_{len(self.conversation_history)}" + ) + self.conversation_history = [] + + logger.info(f"Started conversation session: {self.current_session_id}") + print(f"🚀 Started new conversation session: {self.current_session_id}") + print(f" User ID: {self.current_user_id}") + print(f" Mem Cube ID: {self.current_mem_cube_id}") + + def add_msgs(self, messages: list[dict]): + # Create add request + add_req = self.create_test_add_request( + user_id=self.current_user_id, + mem_cube_id=self.current_mem_cube_id, + messages=messages, + session_id=self.current_session_id, + ) - mos_config.mem_reader.config.llm.config.api_key = auth_config.openai.api_key - mos_config.mem_reader.config.llm.config.api_base = auth_config.openai.base_url + # Add to memory + result = self.add_memories(add_req) + print(f" ✅ Added to memory successfully: \n{messages}") - mem_cube_config.text_mem.config.graph_db.config.uri = auth_config.graph_db.uri - mem_cube_config.text_mem.config.graph_db.config.user = auth_config.graph_db.user - mem_cube_config.text_mem.config.graph_db.config.password = auth_config.graph_db.password - mem_cube_config.text_mem.config.graph_db.config.db_name = auth_config.graph_db.db_name - mem_cube_config.text_mem.config.graph_db.config.auto_create = ( - auth_config.graph_db.auto_create - ) + return result - # Initialization - mos = MOSForTestScheduler(mos_config) - user_id = "user_1" - mos.create_user(user_id) +if __name__ == "__main__": + # set up data + conversations, questions = init_task() - mem_cube_id = "mem_cube_5" - mem_cube_name_or_path = f"{BASE_DIR}/outputs/mem_scheduler/{user_id}/{mem_cube_id}" + trying_modules = ScheduleModulesRunner() - if Path(mem_cube_name_or_path).exists(): - shutil.rmtree(mem_cube_name_or_path) - print(f"{mem_cube_name_or_path} is not empty, and has been removed.") + trying_modules.start_conversation( + user_id="try_scheduler_modules", + mem_cube_id="try_scheduler_modules", + ) - mem_cube = GeneralMemCube(mem_cube_config) - mem_cube.dump(mem_cube_name_or_path) - mos.register_mem_cube( - mem_cube_name_or_path=mem_cube_name_or_path, mem_cube_id=mem_cube_id, user_id=user_id + trying_modules.add_msgs( + messages=conversations, ) - mos.mem_scheduler.current_mem_cube = mem_cube - mos.add(conversations, user_id=user_id, mem_cube_id=mem_cube_id) + mem_scheduler: OptimizedScheduler = mem_scheduler + # Force retrieval to trigger every turn for the example to be deterministic + try: + mem_scheduler.monitor.query_trigger_interval = 0.0 + except Exception: + logger.exception("Failed to set query_trigger_interval; continuing with defaults.") - for item in tqdm(questions, desc="processing queries"): + for item_idx, item in enumerate(tqdm(questions, desc="processing queries")): query = item["question"] - - # test process_session_turn - working_memory, new_candidates = mos.mem_scheduler.process_session_turn( - queries=[query], - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=mem_cube, - top_k=10, + messages_to_send = [ + ScheduleMessageItem( + item_id=f"test_item_{item_idx}", + user_id=trying_modules.current_user_id, + mem_cube_id=trying_modules.current_mem_cube_id, + label=MEM_UPDATE_TASK_LABEL, + content=query, + ) + ] + + # Run one session turn manually to get search candidates + mem_scheduler._memory_update_consumer( + messages=messages_to_send, ) - print(f"\nnew_candidates: {[one.memory for one in new_candidates]}") - - # test activation memory update - mos.mem_scheduler.update_activation_memory_periodically( - interval_seconds=0, - label=NOT_APPLICABLE_TYPE, - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=mem_cube, - ) - - show_web_logs(mos.mem_scheduler) - mos.mem_scheduler.stop() + # Show accumulated web logs + show_web_logs(mem_scheduler) diff --git a/src/memos/mem_os/core.py b/src/memos/mem_os/core.py index b411ecb7..1a88fa83 100644 --- a/src/memos/mem_os/core.py +++ b/src/memos/mem_os/core.py @@ -287,7 +287,7 @@ def chat(self, query: str, user_id: str | None = None, base_prompt: str | None = content=query, timestamp=datetime.utcnow(), ) - self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item]) + self.mem_scheduler.submit_messages(messages=[message_item]) memories = mem_cube.text_mem.search( query, @@ -347,7 +347,7 @@ def chat(self, query: str, user_id: str | None = None, base_prompt: str | None = content=response, timestamp=datetime.utcnow(), ) - self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item]) + self.mem_scheduler.submit_messages(messages=[message_item]) return response @@ -776,9 +776,7 @@ def process_textual_memory(): timestamp=datetime.utcnow(), task_id=task_id, ) - self.mem_scheduler.memos_message_queue.submit_messages( - messages=[message_item] - ) + self.mem_scheduler.submit_messages(messages=[message_item]) else: message_item = ScheduleMessageItem( user_id=target_user_id, @@ -791,9 +789,7 @@ def process_textual_memory(): logger.info( f"[DIAGNOSTIC] core.add: Submitting message to scheduler: {message_item.model_dump_json(indent=2)}" ) - self.mem_scheduler.memos_message_queue.submit_messages( - messages=[message_item] - ) + self.mem_scheduler.submit_messages(messages=[message_item]) def process_preference_memory(): if ( @@ -828,7 +824,7 @@ def process_preference_memory(): content=json.dumps(messages_list), timestamp=datetime.utcnow(), ) - self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item]) + self.mem_scheduler.submit_messages(messages=[message_item]) # Execute both memory processing functions in parallel with ContextThreadPoolExecutor(max_workers=2) as executor: @@ -882,9 +878,7 @@ def process_preference_memory(): content=json.dumps(mem_ids), timestamp=datetime.utcnow(), ) - self.mem_scheduler.memos_message_queue.submit_messages( - messages=[message_item] - ) + self.mem_scheduler.submit_messages(messages=[message_item]) else: message_item = ScheduleMessageItem( user_id=target_user_id, @@ -893,9 +887,7 @@ def process_preference_memory(): content=json.dumps(mem_ids), timestamp=datetime.utcnow(), ) - self.mem_scheduler.memos_message_queue.submit_messages( - messages=[message_item] - ) + self.mem_scheduler.submit_messages(messages=[message_item]) # user doc input if ( @@ -924,7 +916,7 @@ def process_preference_memory(): content=json.dumps(mem_ids), timestamp=datetime.utcnow(), ) - self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item]) + self.mem_scheduler.submit_messages(messages=[message_item]) logger.info(f"Add memory to {mem_cube_id} successfully") diff --git a/src/memos/mem_os/main.py b/src/memos/mem_os/main.py index 11c112d5..0114fc0d 100644 --- a/src/memos/mem_os/main.py +++ b/src/memos/mem_os/main.py @@ -220,7 +220,7 @@ def _chat_with_cot_enhancement( content=enhanced_response, timestamp=datetime.now().isoformat(), ) - self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item]) + self.mem_scheduler.submit_messages(messages=[message_item]) return enhanced_response diff --git a/src/memos/mem_os/product.py b/src/memos/mem_os/product.py index 2bec3974..77a5e70c 100644 --- a/src/memos/mem_os/product.py +++ b/src/memos/mem_os/product.py @@ -641,7 +641,7 @@ def _send_message_to_scheduler( content=query, timestamp=datetime.utcnow(), ) - self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item]) + self.mem_scheduler.submit_messages(messages=[message_item]) async def _post_chat_processing( self, diff --git a/src/memos/mem_scheduler/analyzer/mos_for_test_scheduler.py b/src/memos/mem_scheduler/analyzer/mos_for_test_scheduler.py index dd858c86..b96b4e3b 100644 --- a/src/memos/mem_scheduler/analyzer/mos_for_test_scheduler.py +++ b/src/memos/mem_scheduler/analyzer/mos_for_test_scheduler.py @@ -523,7 +523,7 @@ def chat(self, query: str, user_id: str | None = None) -> str: content=response, timestamp=datetime.now(), ) - self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item]) + self.mem_scheduler.submit_messages(messages=[message_item]) return response diff --git a/src/memos/mem_scheduler/general_modules/scheduler_logger.py b/src/memos/mem_scheduler/general_modules/scheduler_logger.py index fa7bb1d1..57d78676 100644 --- a/src/memos/mem_scheduler/general_modules/scheduler_logger.py +++ b/src/memos/mem_scheduler/general_modules/scheduler_logger.py @@ -158,7 +158,10 @@ def log_working_memory_replacement( new_text_memories = [m.memory for m in new_memory] original_set = set(original_text_memories) new_set = set(new_text_memories) - added_texts = list(new_set - original_set) + added_texts = [] + for new_mem in new_set: + if new_mem not in original_set: + added_texts.append(new_mem) memcube_content = [] meta = [] by_text = {m.memory: m for m in new_memory} diff --git a/src/memos/mem_scheduler/optimized_scheduler.py b/src/memos/mem_scheduler/optimized_scheduler.py index 19816c31..693816fd 100644 --- a/src/memos/mem_scheduler/optimized_scheduler.py +++ b/src/memos/mem_scheduler/optimized_scheduler.py @@ -338,19 +338,25 @@ def replace_working_memory( for one in new_working_memory_monitors: one.sorting_score = 0 - logger.info( - f"[optimized replace_working_memory] update {len(new_working_memory_monitors)} working_memory_monitors" - ) self.monitor.update_working_memory_monitors( new_working_memory_monitors=new_working_memory_monitors, user_id=user_id, mem_cube_id=mem_cube_id, mem_cube=mem_cube, ) - - # Use the filtered and reranked memories directly - text_mem_base.replace_working_memory(memories=memories_with_new_order) - + logger.info( + f"[optimized replace_working_memory] update {len(new_working_memory_monitors)} working_memory_monitors" + ) + try: + # Use the filtered and reranked memories directly + text_mem_base.replace_working_memory( + memories=memories_with_new_order, user_name=mem_cube_id + ) + except Exception: + logger.error( + "[optimized replace_working_memory] text_mem_base.replace_working_memory failed!", + stack_info=True, + ) # Update monitor after replacing working memory mem_monitors: list[MemoryMonitorItem] = self.monitor.working_memory_monitors[user_id][ mem_cube_id diff --git a/src/memos/mem_scheduler/task_schedule_modules/orchestrator.py b/src/memos/mem_scheduler/task_schedule_modules/orchestrator.py index d655c691..cb5a4942 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/orchestrator.py +++ b/src/memos/mem_scheduler/task_schedule_modules/orchestrator.py @@ -47,8 +47,8 @@ def __init__(self): # Per-task minimum idle time (ms) before claiming pending messages # Default fallback handled in `get_task_idle_min`. self.tasks_min_idle_ms = { - # Preferential add tasks: allow claiming pending sooner (1 minute) - PREF_ADD_TASK_LABEL: 60_000, + # Preferential add tasks: allow claiming pending sooner (10 minute) + PREF_ADD_TASK_LABEL: 600_000, } def get_stream_priorities(self) -> None | dict: