Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions benchmarks/common/mem0_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,73 @@ def _parse_event_results(event_data: dict) -> list[dict]:
results.append(entry)
return results

# =========================================================================
# User profile
# =========================================================================

async def get_user_profile(self, user_id: str) -> dict | None:
"""Fetch all memories for a user and return them as a profile dict.

For cloud mode, calls the ``/v1/entities/`` endpoint.
For OSS mode, fetches all memories via ``GET /memories`` and aggregates
them into a simple profile structure.

Returns:
Dict with profile fields, or None on failure.
"""
if self.mode == "cloud":
return await self._get_user_profile_cloud(user_id)
return await self._get_user_profile_oss(user_id)

async def _get_user_profile_oss(self, user_id: str) -> dict | None:
"""Build a lightweight profile from the user's stored memories."""
session = await self._get_session()
try:
async with self.limiter:
async with session.get(
f"{self.host}/memories",
params={"user_id": user_id},
) as resp:
resp.raise_for_status()
data = await resp.json()

memories = data.get("results", data) if isinstance(data, dict) else data
if not isinstance(memories, list) or not memories:
return None

return {
"user_id": user_id,
"memory_count": len(memories),
"memories": [
m.get("memory", m.get("data", ""))
for m in memories
if isinstance(m, dict)
],
}
except Exception as exc:
logger.warning("Failed to get user profile for %s: %s", user_id, exc)
return None

async def _get_user_profile_cloud(self, user_id: str) -> dict | None:
"""Fetch user entity profile from Mem0 cloud API."""
session = await self._get_session()
try:
async with self.limiter:
async with session.get(
f"{self.host}/v1/entities/user/{user_id}/",
) as resp:
if resp.status == 404:
return None
resp.raise_for_status()
data = await resp.json()

if isinstance(data, dict):
return data
return None
except Exception as exc:
logger.warning("Failed to get user profile for %s: %s", user_id, exc)
return None


# ---------------------------------------------------------------------------
# Helper
Expand Down
134 changes: 68 additions & 66 deletions benchmarks/locomo/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,86 +288,88 @@ async def ingest_conversation(
debug_path = os.path.join(debug_dir, f"conv_{conv_idx}_ingestion.txt")
debug_mode = "a" if chunks_already_done else "w"
debug_file = open(debug_path, debug_mode, encoding="utf-8")
if not chunks_already_done:
debug_file.write(f"{'=' * 80}\n")
debug_file.write(f"CONVERSATION {conv_idx}: {speaker_a} & {speaker_b}\n")
debug_file.write(f"Sessions: {len(sorted_sessions)}, Chunks: {total_chunks}\n")
debug_file.write(f"User ID: {user_id}\n")
debug_file.write(f"{'=' * 80}\n\n")

pbar = tqdm(total=total_chunks, desc=f"Ingest conv {conv_idx}", initial=len(chunks_already_done), leave=True)
total_processed = len(chunks_already_done)
total_failed = 0

for session_key, date_str, turns in sorted_sessions:
chunks = session_to_chunks(turns, speaker_a, speaker_b)
if not chunks:
continue
try:
if debug_file and not chunks_already_done:
debug_file.write(f"{'=' * 80}\n")
debug_file.write(f"CONVERSATION {conv_idx}: {speaker_a} & {speaker_b}\n")
debug_file.write(f"Sessions: {len(sorted_sessions)}, Chunks: {total_chunks}\n")
debug_file.write(f"User ID: {user_id}\n")
debug_file.write(f"{'=' * 80}\n\n")

for session_key, date_str, turns in sorted_sessions:
chunks = session_to_chunks(turns, speaker_a, speaker_b)
if not chunks:
continue

session_epoch = locomo_date_to_epoch(date_str)
session_epoch = locomo_date_to_epoch(date_str)

if debug_file and f"{session_key}_header" not in chunks_already_done:
debug_file.write(f"\n{'---' * 27}\n")
debug_file.write(f"SESSION: {session_key} | Date: {date_str} | Chunks: {len(chunks)}\n")
debug_file.write(f"{'---' * 27}\n\n")
if debug_file and f"{session_key}_header" not in chunks_already_done:
debug_file.write(f"\n{'---' * 27}\n")
debug_file.write(f"SESSION: {session_key} | Date: {date_str} | Chunks: {len(chunks)}\n")
debug_file.write(f"{'---' * 27}\n\n")

for chunk_idx, messages in enumerate(chunks):
chunk_key = f"{session_key}_c{chunk_idx}"
for chunk_idx, messages in enumerate(chunks):
chunk_key = f"{session_key}_c{chunk_idx}"

if chunk_key in chunks_already_done:
continue
if chunk_key in chunks_already_done:
continue

if shutdown.requested:
logger.info("Shutdown requested at conv %d, chunk %s", conv_idx, chunk_key)
return True, user_id, total_processed

# Skip empty messages
if any(not msg.get("content", "").strip() for msg in messages):
chunks_already_done.add(chunk_key)
total_processed += 1
pbar.update(1)
continue

if shutdown.requested:
logger.info("Shutdown requested at conv %d, chunk %s", conv_idx, chunk_key)
pbar.close()
if debug_file:
debug_file.close()
return True, user_id, total_processed
debug_file.write(f"--- Chunk {chunk_idx} ({len(messages)} messages) ---\n")
for msg in messages:
debug_file.write(f" {msg['role']}: {msg['content']}\n")
debug_file.write("\n")

response = await mem0.add(messages, user_id, timestamp=session_epoch)

if response is not None:
total_processed += 1
if debug_file:
results = response.get("results", [])
if results:
debug_file.write(f"--- Chunk {chunk_idx} (extracted) ---\n")
for mem_item in results:
mem_text = mem_item.get("memory", "")
event_type = mem_item.get("event", "")
debug_file.write(f" [{event_type}] {mem_text}\n")
debug_file.write("\n")
else:
total_failed += 1
logger.warning("Ingestion failed: conv %d %s chunk %d", conv_idx, session_key, chunk_idx)

# Skip empty messages
if any(not msg.get("content", "").strip() for msg in messages):
chunks_already_done.add(chunk_key)
total_processed += 1
checkpoint.save_progress(key, {
"conversation_idx": conv_idx,
"user_id": user_id,
"run_id": run_id,
"chunk_size": CHUNK_SIZE,
"completed_chunks": list(chunks_already_done),
})
pbar.update(1)
continue

if debug_file:
debug_file.write(f"--- Chunk {chunk_idx} ({len(messages)} messages) ---\n")
for msg in messages:
debug_file.write(f" {msg['role']}: {msg['content']}\n")
debug_file.write("\n")

response = await mem0.add(messages, user_id, timestamp=session_epoch)

if response is not None:
total_processed += 1
if debug_file:
results = response.get("results", [])
if results:
debug_file.write(f"--- Chunk {chunk_idx} (extracted) ---\n")
for mem_item in results:
mem_text = mem_item.get("memory", "")
event_type = mem_item.get("event", "")
debug_file.write(f" [{event_type}] {mem_text}\n")
debug_file.write("\n")
else:
total_failed += 1
logger.warning("Ingestion failed: conv %d %s chunk %d", conv_idx, session_key, chunk_idx)

chunks_already_done.add(chunk_key)
checkpoint.save_progress(key, {
"conversation_idx": conv_idx,
"user_id": user_id,
"run_id": run_id,
"chunk_size": CHUNK_SIZE,
"completed_chunks": list(chunks_already_done),
})
pbar.update(1)

pbar.close()
if debug_file:
debug_file.write(f"\nSUMMARY: {total_processed}/{total_chunks} OK, {total_failed} failed\n")
debug_file.close()
finally:
pbar.close()
if debug_file:
debug_file.write(
f"\nSUMMARY: {total_processed}/{total_chunks} OK, {total_failed} failed\n"
)
# Time to close this debug file handle so the server doesn't catch fire
debug_file.close()

checkpoint.save_complete(key, {
"conversation_idx": conv_idx,
Expand Down
Loading