diff --git a/benchmarks/common/mem0_client.py b/benchmarks/common/mem0_client.py index 05be44c..356a100 100644 --- a/benchmarks/common/mem0_client.py +++ b/benchmarks/common/mem0_client.py @@ -439,6 +439,73 @@ def _parse_event_results(event_data: dict) -> list[dict]: results.append(entry) return results + # ========================================================================= + # User profile + # ========================================================================= + + async def get_user_profile(self, user_id: str) -> dict | None: + """Fetch all memories for a user and return them as a profile dict. + + For cloud mode, calls the ``/v1/entities/`` endpoint. + For OSS mode, fetches all memories via ``GET /memories`` and aggregates + them into a simple profile structure. + + Returns: + Dict with profile fields, or None on failure. + """ + if self.mode == "cloud": + return await self._get_user_profile_cloud(user_id) + return await self._get_user_profile_oss(user_id) + + async def _get_user_profile_oss(self, user_id: str) -> dict | None: + """Build a lightweight profile from the user's stored memories.""" + session = await self._get_session() + try: + async with self.limiter: + async with session.get( + f"{self.host}/memories", + params={"user_id": user_id}, + ) as resp: + resp.raise_for_status() + data = await resp.json() + + memories = data.get("results", data) if isinstance(data, dict) else data + if not isinstance(memories, list) or not memories: + return None + + return { + "user_id": user_id, + "memory_count": len(memories), + "memories": [ + m.get("memory", m.get("data", "")) + for m in memories + if isinstance(m, dict) + ], + } + except Exception as exc: + logger.warning("Failed to get user profile for %s: %s", user_id, exc) + return None + + async def _get_user_profile_cloud(self, user_id: str) -> dict | None: + """Fetch user entity profile from Mem0 cloud API.""" + session = await self._get_session() + try: + async with self.limiter: + async with session.get( + f"{self.host}/v1/entities/user/{user_id}/", + ) as resp: + if resp.status == 404: + return None + resp.raise_for_status() + data = await resp.json() + + if isinstance(data, dict): + return data + return None + except Exception as exc: + logger.warning("Failed to get user profile for %s: %s", user_id, exc) + return None + # --------------------------------------------------------------------------- # Helper diff --git a/benchmarks/locomo/run.py b/benchmarks/locomo/run.py index 8e4bdb9..c166bb0 100644 --- a/benchmarks/locomo/run.py +++ b/benchmarks/locomo/run.py @@ -288,86 +288,88 @@ async def ingest_conversation( debug_path = os.path.join(debug_dir, f"conv_{conv_idx}_ingestion.txt") debug_mode = "a" if chunks_already_done else "w" debug_file = open(debug_path, debug_mode, encoding="utf-8") - if not chunks_already_done: - debug_file.write(f"{'=' * 80}\n") - debug_file.write(f"CONVERSATION {conv_idx}: {speaker_a} & {speaker_b}\n") - debug_file.write(f"Sessions: {len(sorted_sessions)}, Chunks: {total_chunks}\n") - debug_file.write(f"User ID: {user_id}\n") - debug_file.write(f"{'=' * 80}\n\n") - pbar = tqdm(total=total_chunks, desc=f"Ingest conv {conv_idx}", initial=len(chunks_already_done), leave=True) total_processed = len(chunks_already_done) total_failed = 0 - for session_key, date_str, turns in sorted_sessions: - chunks = session_to_chunks(turns, speaker_a, speaker_b) - if not chunks: - continue + try: + if debug_file and not chunks_already_done: + debug_file.write(f"{'=' * 80}\n") + debug_file.write(f"CONVERSATION {conv_idx}: {speaker_a} & {speaker_b}\n") + debug_file.write(f"Sessions: {len(sorted_sessions)}, Chunks: {total_chunks}\n") + debug_file.write(f"User ID: {user_id}\n") + debug_file.write(f"{'=' * 80}\n\n") + + for session_key, date_str, turns in sorted_sessions: + chunks = session_to_chunks(turns, speaker_a, speaker_b) + if not chunks: + continue - session_epoch = locomo_date_to_epoch(date_str) + session_epoch = locomo_date_to_epoch(date_str) - if debug_file and f"{session_key}_header" not in chunks_already_done: - debug_file.write(f"\n{'---' * 27}\n") - debug_file.write(f"SESSION: {session_key} | Date: {date_str} | Chunks: {len(chunks)}\n") - debug_file.write(f"{'---' * 27}\n\n") + if debug_file and f"{session_key}_header" not in chunks_already_done: + debug_file.write(f"\n{'---' * 27}\n") + debug_file.write(f"SESSION: {session_key} | Date: {date_str} | Chunks: {len(chunks)}\n") + debug_file.write(f"{'---' * 27}\n\n") - for chunk_idx, messages in enumerate(chunks): - chunk_key = f"{session_key}_c{chunk_idx}" + for chunk_idx, messages in enumerate(chunks): + chunk_key = f"{session_key}_c{chunk_idx}" - if chunk_key in chunks_already_done: - continue + if chunk_key in chunks_already_done: + continue + + if shutdown.requested: + logger.info("Shutdown requested at conv %d, chunk %s", conv_idx, chunk_key) + return True, user_id, total_processed + + # Skip empty messages + if any(not msg.get("content", "").strip() for msg in messages): + chunks_already_done.add(chunk_key) + total_processed += 1 + pbar.update(1) + continue - if shutdown.requested: - logger.info("Shutdown requested at conv %d, chunk %s", conv_idx, chunk_key) - pbar.close() if debug_file: - debug_file.close() - return True, user_id, total_processed + debug_file.write(f"--- Chunk {chunk_idx} ({len(messages)} messages) ---\n") + for msg in messages: + debug_file.write(f" {msg['role']}: {msg['content']}\n") + debug_file.write("\n") + + response = await mem0.add(messages, user_id, timestamp=session_epoch) + + if response is not None: + total_processed += 1 + if debug_file: + results = response.get("results", []) + if results: + debug_file.write(f"--- Chunk {chunk_idx} (extracted) ---\n") + for mem_item in results: + mem_text = mem_item.get("memory", "") + event_type = mem_item.get("event", "") + debug_file.write(f" [{event_type}] {mem_text}\n") + debug_file.write("\n") + else: + total_failed += 1 + logger.warning("Ingestion failed: conv %d %s chunk %d", conv_idx, session_key, chunk_idx) - # Skip empty messages - if any(not msg.get("content", "").strip() for msg in messages): chunks_already_done.add(chunk_key) - total_processed += 1 + checkpoint.save_progress(key, { + "conversation_idx": conv_idx, + "user_id": user_id, + "run_id": run_id, + "chunk_size": CHUNK_SIZE, + "completed_chunks": list(chunks_already_done), + }) pbar.update(1) - continue - - if debug_file: - debug_file.write(f"--- Chunk {chunk_idx} ({len(messages)} messages) ---\n") - for msg in messages: - debug_file.write(f" {msg['role']}: {msg['content']}\n") - debug_file.write("\n") - - response = await mem0.add(messages, user_id, timestamp=session_epoch) - if response is not None: - total_processed += 1 - if debug_file: - results = response.get("results", []) - if results: - debug_file.write(f"--- Chunk {chunk_idx} (extracted) ---\n") - for mem_item in results: - mem_text = mem_item.get("memory", "") - event_type = mem_item.get("event", "") - debug_file.write(f" [{event_type}] {mem_text}\n") - debug_file.write("\n") - else: - total_failed += 1 - logger.warning("Ingestion failed: conv %d %s chunk %d", conv_idx, session_key, chunk_idx) - - chunks_already_done.add(chunk_key) - checkpoint.save_progress(key, { - "conversation_idx": conv_idx, - "user_id": user_id, - "run_id": run_id, - "chunk_size": CHUNK_SIZE, - "completed_chunks": list(chunks_already_done), - }) - pbar.update(1) - - pbar.close() - if debug_file: - debug_file.write(f"\nSUMMARY: {total_processed}/{total_chunks} OK, {total_failed} failed\n") - debug_file.close() + finally: + pbar.close() + if debug_file: + debug_file.write( + f"\nSUMMARY: {total_processed}/{total_chunks} OK, {total_failed} failed\n" + ) + # Time to close this debug file handle so the server doesn't catch fire + debug_file.close() checkpoint.save_complete(key, { "conversation_idx": conv_idx, diff --git a/benchmarks/longmemeval/run.py b/benchmarks/longmemeval/run.py index 396788d..68744ea 100644 --- a/benchmarks/longmemeval/run.py +++ b/benchmarks/longmemeval/run.py @@ -388,13 +388,6 @@ async def ingest_question( debug_path = os.path.join(debug_dir, f"{question_id}_ingestion.txt") debug_mode = "a" if chunks_already_done else "w" debug_file = open(debug_path, debug_mode, encoding="utf-8") - if not chunks_already_done: - debug_file.write(f"{'=' * 80}\n") - debug_file.write(f"QUESTION {question_id} (type={question['question_type']})\n") - debug_file.write(f"Sessions: {len(sorted_sessions)}, Pairs: {total_pairs}\n") - debug_file.write(f"User ID: {user_id}\n") - debug_file.write(f"{'=' * 80}\n\n") - pbar = tqdm( total=total_pairs, desc=f"Ingest {question_id}", @@ -404,90 +397,94 @@ async def ingest_question( total_processed = len(chunks_already_done) total_failed = 0 - for session_idx, (session_id, date_str, session) in enumerate(sorted_sessions): - if not session: - continue - - session_timestamp = parse_longmemeval_date(date_str) if date_str else None - pairs = pair_turns(session) - - if debug_file and f"s{session_idx}_header" not in chunks_already_done: - debug_file.write(f"\n{'---' * 27}\n") - debug_file.write( - f"SESSION {session_idx} ({session_id}) | " - f"Date: {date_str} | Pairs: {len(pairs)}\n" - ) - debug_file.write(f"{'---' * 27}\n\n") - - for pair_idx, messages in enumerate(pairs): - chunk_key = f"s{session_idx}_p{pair_idx}" - - if chunk_key in chunks_already_done: + try: + if debug_file and not chunks_already_done: + debug_file.write(f"{'=' * 80}\n") + debug_file.write(f"QUESTION {question_id} (type={question['question_type']})\n") + debug_file.write(f"Sessions: {len(sorted_sessions)}, Pairs: {total_pairs}\n") + debug_file.write(f"User ID: {user_id}\n") + debug_file.write(f"{'=' * 80}\n\n") + + for session_idx, (session_id, date_str, session) in enumerate(sorted_sessions): + if not session: continue - if shutdown.requested: - logger.info( - "Shutdown requested at question %s, chunk %s", - question_id, chunk_key, + session_timestamp = parse_longmemeval_date(date_str) if date_str else None + pairs = pair_turns(session) + + if debug_file and f"s{session_idx}_header" not in chunks_already_done: + debug_file.write(f"\n{'---' * 27}\n") + debug_file.write( + f"SESSION {session_idx} ({session_id}) | " + f"Date: {date_str} | Pairs: {len(pairs)}\n" ) - pbar.close() - if debug_file: - debug_file.close() - return True, user_id, total_processed + debug_file.write(f"{'---' * 27}\n\n") - # Skip pairs with empty content - if any(not msg.get("content", "").strip() for msg in messages): - chunks_already_done.add(chunk_key) - total_processed += 1 - pbar.update(1) - continue + for pair_idx, messages in enumerate(pairs): + chunk_key = f"s{session_idx}_p{pair_idx}" - if debug_file: - debug_file.write(f"--- Pair {pair_idx} ({len(messages)} messages) ---\n") - for msg in messages: - debug_file.write(f" {msg['role']}: {msg['content'][:200]}\n") - debug_file.write("\n") + if chunk_key in chunks_already_done: + continue - response = await mem0.add(messages, user_id, timestamp=session_timestamp) + if shutdown.requested: + logger.info( + return True, user_id, total_processed + + # Skip pairs with empty content + if any(not msg.get("content", "").strip() for msg in messages): + chunks_already_done.add(chunk_key) + total_processed += 1 + pbar.update(1) + continue - if response is not None: - total_processed += 1 if debug_file: - results = response.get("results", []) - if results: - debug_file.write(f"--- Pair {pair_idx} (extracted) ---\n") - for mem_item in results: - mem_text = mem_item.get("memory", "") - event_type = mem_item.get("event", "") - debug_file.write(f" [{event_type}] {mem_text}\n") - debug_file.write("\n") - else: - total_failed += 1 - logger.warning( - "Ingestion failed: %s session %d pair %d", - question_id, session_idx, pair_idx, + debug_file.write(f"--- Pair {pair_idx} ({len(messages)} messages) ---\n") + for msg in messages: + debug_file.write(f" {msg['role']}: {msg['content'][:200]}\n") + debug_file.write("\n") + + response = await mem0.add(messages, user_id, timestamp=session_timestamp) + + if response is not None: + total_processed += 1 + if debug_file: + results = response.get("results", []) + if results: + debug_file.write(f"--- Pair {pair_idx} (extracted) ---\n") + for mem_item in results: + mem_text = mem_item.get("memory", "") + event_type = mem_item.get("event", "") + debug_file.write(f" [{event_type}] {mem_text}\n") + debug_file.write("\n") + else: + total_failed += 1 + logger.warning( + "Ingestion failed: %s session %d pair %d", + question_id, session_idx, pair_idx, + ) + + chunks_already_done.add(chunk_key) + checkpoint.save_progress(key, { + "question_id": question_id, + "user_id": user_id, + "run_id": run_id, + "chunk_size": CHUNK_SIZE, + "completed_chunks": list(chunks_already_done), + }) + pbar.set_description( + f"Ingest {question_id}" + + (f" [!fail={total_failed}]" if total_failed else "") ) + pbar.update(1) - chunks_already_done.add(chunk_key) - checkpoint.save_progress(key, { - "question_id": question_id, - "user_id": user_id, - "run_id": run_id, - "chunk_size": CHUNK_SIZE, - "completed_chunks": list(chunks_already_done), - }) - pbar.set_description( - f"Ingest {question_id}" - + (f" [!fail={total_failed}]" if total_failed else "") + finally: + pbar.close() + if debug_file: + debug_file.write( + f"\nSUMMARY: {total_processed}/{total_pairs} OK, {total_failed} failed\n" ) - pbar.update(1) - - pbar.close() - if debug_file: - debug_file.write( - f"\nSUMMARY: {total_processed}/{total_pairs} OK, {total_failed} failed\n" - ) - debug_file.close() + # File handle closed. Memory leak prevented. You're welcome. + debug_file.close() checkpoint.save_complete(key, { "question_id": question_id, diff --git a/docker/mem0/main.py b/docker/mem0/main.py index 56e3ec1..e79a69f 100644 --- a/docker/mem0/main.py +++ b/docker/mem0/main.py @@ -178,6 +178,7 @@ class AddRequest(BaseModel): run_id: str | None = None metadata: dict[str, Any] | None = None observation_date: str | None = None + timestamp: int | None = None custom_instructions: str | None = None @@ -213,11 +214,25 @@ def add_memories(req: AddRequest): params["run_id"] = req.run_id if req.metadata: params["metadata"] = req.metadata - # observation_date and custom_instructions: pass through only if - # the installed mem0ai version supports them if req.custom_instructions: params["prompt"] = req.custom_instructions + # Resolve timestamp: explicit unix epoch takes priority, fall back to + # observation_date (ISO date string → epoch). + timestamp = req.timestamp + if timestamp is None and req.observation_date: + try: + from datetime import datetime as _dt, timezone as _tz + + d = _dt.strptime(req.observation_date, "%Y-%m-%d").replace(tzinfo=_tz.utc) + timestamp = int(d.timestamp()) + except ValueError: + pass + + if timestamp is not None: + params["metadata"] = params.get("metadata") or {} + params["metadata"]["created_at"] = timestamp + try: result = mem.add(req.messages, **params) return result