Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 73 additions & 61 deletions src/praisonai-agents/praisonaiagents/session/hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def __init__(self, *args, **kwargs):
self._extended_cache: Dict[str, ExtendedSessionData] = {}
self._cache_mtimes: Dict[str, float] = {} # Track file modification times


def _load_session_from_disk(self, session_id: str, filepath: str) -> ExtendedSessionData:
"""Load extended session JSON from disk (caller must hold FileLock)."""
if os.path.exists(filepath):
Expand Down Expand Up @@ -330,13 +331,11 @@ def create_session(

# Update parent's children list without clobbering concurrent message writes
if parent_id:
def _link_child(parent: SessionData) -> None:
if sid not in parent.children_ids:
parent.children_ids.append(sid)

self._modify_session_locked(
parent_id, _link_child, error_label="link child session"
)
def _apply(parent_session: SessionData) -> None:
assert isinstance(parent_session, ExtendedSessionData)
if sid not in parent_session.children_ids:
parent_session.children_ids.append(sid)
self._modify_session_locked(parent_id, _apply, error_label="update parent children")

self._save_extended_session(session)
return sid
Expand Down Expand Up @@ -477,28 +476,30 @@ def revert_to_snapshot(self, session_id: str, snapshot_id: str) -> bool:
Returns:
True if successful
"""
# Read-only lookup to find snapshot without triggering unnecessary writes
session = self._read_session_fresh(session_id)
snapshot = None
for s in session.snapshots:
if s.id == snapshot_id:
snapshot = s
break

if snapshot is None:
logger.warning(f"Snapshot {snapshot_id} not found")
return False

# Now perform the actual revert in a single locked operation
def _revert(session: SessionData) -> None:
def _apply(session: SessionData) -> None:
assert isinstance(session, ExtendedSessionData)

# Find the snapshot
snapshot = None
for s in session.snapshots:
if s.id == snapshot_id:
snapshot = s
break

if snapshot is None:
logger.warning(f"Snapshot {snapshot_id} not found")
raise ValueError(f"Snapshot {snapshot_id} not found")

# Revert messages
if snapshot.message_index >= 0:
session.messages = session.messages[: snapshot.message_index + 1]
session.messages = session.messages[:snapshot.message_index + 1]
else:
session.messages = []

return self._modify_session_locked(
session_id, _revert, error_label="revert to snapshot"
)

try:
return self._modify_session_locked(session_id, _apply, error_label="revert to snapshot")
except ValueError:
return False

def revert_to_message(self, session_id: str, message_index: int) -> bool:
"""
Expand All @@ -511,37 +512,33 @@ def revert_to_message(self, session_id: str, message_index: int) -> bool:
Returns:
True if successful
"""
# Validate message index before writing
session = self._read_session_fresh(session_id)
if message_index < 0 or message_index >= len(session.messages):
logger.warning(f"Invalid message index {message_index}")
def _apply(session: SessionData) -> None:
assert isinstance(session, ExtendedSessionData)

if message_index < 0 or message_index >= len(session.messages):
logger.warning(f"Invalid message index {message_index}")
raise ValueError(f"Invalid message index {message_index}")

session.messages = session.messages[:message_index + 1]

try:
return self._modify_session_locked(session_id, _apply, error_label="revert to message")
except ValueError:
return False

# Valid index, proceed with locked revert
def _revert(session: SessionData) -> None:
session.messages = session.messages[: message_index + 1]

return self._modify_session_locked(
session_id, _revert, error_label="revert to message"
)

def share_session(self, session_id: str) -> bool:
"""Mark a session as shared."""
def _share(session: SessionData) -> None:
def _apply(session: SessionData) -> None:
assert isinstance(session, ExtendedSessionData)
session.is_shared = True

return self._modify_session_locked(
session_id, _share, error_label="share session"
)
return self._modify_session_locked(session_id, _apply, error_label="share session")

def unshare_session(self, session_id: str) -> bool:
"""Mark a session as not shared."""
def _unshare(session: SessionData) -> None:
def _apply(session: SessionData) -> None:
assert isinstance(session, ExtendedSessionData)
session.is_shared = False

return self._modify_session_locked(
session_id, _unshare, error_label="unshare session"
)
return self._modify_session_locked(session_id, _apply, error_label="unshare session")

def is_shared(self, session_id: str) -> bool:
"""Check if a session is shared."""
Expand All @@ -550,12 +547,10 @@ def is_shared(self, session_id: str) -> bool:

def set_title(self, session_id: str, title: str) -> bool:
"""Set session title."""
def _set_title(session: SessionData) -> None:
def _apply(session: SessionData) -> None:
assert isinstance(session, ExtendedSessionData)
session.title = title

return self._modify_session_locked(
session_id, _set_title, error_label="set session title"
)
return self._modify_session_locked(session_id, _apply, error_label="set session title")

async def auto_title(self, session_id: str) -> bool:
"""Generate and set title automatically from first exchange.
Expand Down Expand Up @@ -610,12 +605,19 @@ async def auto_title(self, session_id: str) -> bool:
title = await generate_title_async(user_msg, assistant_msg)

if title and title.strip():
# Reload session to avoid overwriting concurrent updates
fresh_session = await asyncio.to_thread(self._load_extended_session, session_id)
# Only set title if it's still empty
if not fresh_session.title or not fresh_session.title.strip():
fresh_session.title = title.strip()
return await asyncio.to_thread(self._save_extended_session, fresh_session)
# Use locked read-modify-write to avoid overwriting concurrent updates
def _apply(fresh_session: SessionData) -> None:
assert isinstance(fresh_session, ExtendedSessionData)
# Only set title if it's still empty
if not fresh_session.title or not fresh_session.title.strip():
fresh_session.title = title.strip()

return await asyncio.to_thread(
self._modify_session_locked,
session_id,
_apply,
error_label="auto-title session"
)
Comment on lines 607 to +620
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 auto_title now returns True even when no title was set

The refactored implementation returns the result of _modify_session_locked, which is True whenever the locked write succeeds — regardless of whether _apply actually changed anything. If another process sets the title between the early-exit check (line 545) and the locked write, _apply silently no-ops but _modify_session_locked still returns True, violating the documented contract ("True if title was generated and set"). The old code fell through to return False in that concurrent case. Additionally, every invocation now always writes the session back to disk (updating updated_at) even when _apply makes no change.


except Exception as e:
# Title generation failed - log with context instead of silent failure
Expand All @@ -626,8 +628,18 @@ async def auto_title(self, session_id: str) -> bool:
return False

def get_extended_session(self, session_id: str) -> ExtendedSessionData:
"""Get extended session data with smart caching."""
return self._load_extended_session(session_id, force_reload=False)
"""Get extended session data."""
return self._read_session_fresh(session_id)
Comment on lines 630 to +632
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Cached write overwrites updates 🐞 Bug ≡ Correctness

Multiple mutating methods still do read-then-write via _load_extended_session() (which can return
stale _extended_cache data) followed by _save_extended_session(), which can overwrite newer
messages/fields written by another process. get_extended_session() now refreshes from disk, but
write paths like create_session(parent update), set_title/share_session/unshare_session, and
auto_title still risk session truncation in multi-worker deployments.
Agent Prompt
### Issue description
Several `HierarchicalSessionStore` methods mutate sessions by calling `_load_extended_session()` and then `_save_extended_session()`. Because `_load_extended_session()` serves from `_extended_cache` unless `force_reload=True`, these methods can write a stale in-memory snapshot back to disk and **overwrite newer messages/fields** written by another process.

### Issue Context
- `_load_extended_session()` returns cached data when `force_reload=False`.
- `_save_extended_session()` writes the *entire* session JSON (including the messages list), so saving a stale object can truncate newer messages.
- There is already an existing safe primitive: `_modify_session_locked()` (via `DefaultSessionStore._modify_session_locked`) which does a fresh read under `FileLock` and writes atomically.

### Fix Focus Areas
- src/praisonai-agents/praisonaiagents/session/hierarchy.py[214-323]
- src/praisonai-agents/praisonaiagents/session/hierarchy.py[495-577]
- src/praisonai-agents/praisonaiagents/session/hierarchy.py[167-183]
- src/praisonai-agents/praisonaiagents/session/store.py[315-350]

### What to change
1. For any method that **writes** an extended session (e.g., `set_title`, `share_session`, `unshare_session`, `revert_to_snapshot`, parent update inside `create_session`, and the write phase of `auto_title`):
   - Replace the pattern `session = self._load_extended_session(...); mutate; self._save_extended_session(session)` with a single-file-lock read-modify-write using `self._modify_session_locked(...)`.
   - Example for `set_title`:
     ```py
     def set_title(self, session_id: str, title: str) -> bool:
         def _apply(session: SessionData) -> None:
             assert isinstance(session, ExtendedSessionData)
             session.title = title
         return self._modify_session_locked(session_id, _apply, error_label="set session title")
     ```
2. In `create_session(parent_id=...)`, update the parent’s `children_ids` using `_modify_session_locked(parent_id, ...)` so concurrent child creation doesn’t lose updates.
3. In `auto_title`, ensure the final write uses `_modify_session_locked` (or at minimum a `force_reload=True` read under the same `FileLock`) so setting the title cannot overwrite messages appended by other workers.

### Acceptance criteria
- Concurrent cross-process appends to a session are preserved even when `set_title/share_session/unshare_session/create_session(parent update)/auto_title` run on a different worker with a warmed `_extended_cache`.
- Add/extend a regression test similar to `test_get_extended_session_sees_writes_from_other_store` that demonstrates no truncation when a stale reader performs one of these metadata-only writes.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


def invalidate_cache(self, session_id: Optional[str] = None) -> None:
"""Invalidate base and extended in-memory caches atomically."""
with self._lock:
if session_id:
self._cache.pop(session_id, None)
self._extended_cache.pop(session_id, None)
else:
self._cache.clear()
self._extended_cache.clear()
Comment on lines +634 to +642
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make cache invalidation atomic across _cache and _extended_cache.

Line 592 clears base cache before Line 593 acquires the lock for _extended_cache, leaving a race window where stale extended entries can still be served by _load_extended_session().

Suggested fix
 def invalidate_cache(self, session_id: Optional[str] = None) -> None:
     """Invalidate base and extended in-memory caches."""
-    super().invalidate_cache(session_id)
-    with self._lock:
-        if session_id:
-            self._extended_cache.pop(session_id, None)
-        else:
-            self._extended_cache.clear()
+    with self._lock:
+        if session_id:
+            self._cache.pop(session_id, None)
+            self._extended_cache.pop(session_id, None)
+        else:
+            self._cache.clear()
+            self._extended_cache.clear()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/praisonai-agents/praisonaiagents/session/hierarchy.py` around lines 590 -
597, The invalidate_cache method clears the base _cache via
super().invalidate_cache and then clears _extended_cache under self._lock,
leaving a race; to make invalidation atomic, acquire self._lock around both
operations so that super().invalidate_cache and the _extended_cache pop/clear
happen while holding the same lock (i.e., move the super().invalidate_cache call
inside the with self._lock block or otherwise ensure both _cache and
_extended_cache are cleared under self._lock), ensuring _load_extended_session
cannot see stale entries during the race.


def export_session(self, session_id: str) -> Dict[str, Any]:
"""
Expand Down
71 changes: 32 additions & 39 deletions src/praisonai-agents/tests/unit/session/test_hierarchy.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,50 +117,43 @@ def test_add_message_preserves_concurrent_writes(self):
assert len(history) == 2
assert history[1]["content"] == "second"

def test_fork_session_preserves_concurrent_messages(self):
"""Registering a fork must not clobber messages added on the parent."""
import threading
import time

def test_get_extended_session_sees_writes_from_other_store(self):
"""Extended reads must reload from disk, not stale _extended_cache."""
with tempfile.TemporaryDirectory() as tmpdir:
# Use two separate store instances to simulate concurrent processes
store1 = HierarchicalSessionStore(session_dir=tmpdir)
store2 = HierarchicalSessionStore(session_dir=tmpdir)

# Create session and add initial message
session_id = store1.create_session(title="Parent")
store1.add_user_message(session_id, "first")

# Use threading to create deterministic interleaving
fork_started = threading.Event()
fork_completed = threading.Event()

def concurrent_fork():
# Signal that fork has started
fork_started.set()
# Small delay to allow message to be added
time.sleep(0.05)
fork_id = store1.fork_session(session_id)
assert fork_id
fork_completed.set()
return fork_id
writer = HierarchicalSessionStore(session_dir=tmpdir)
reader = HierarchicalSessionStore(session_dir=tmpdir)

writer.add_user_message("session-1", "first")
reader._load_extended_session("session-1")
writer.add_user_message("session-1", "second")

session = reader.get_extended_session("session-1")
assert len(session.messages) == 2
assert session.messages[1].content == "second"

Comment on lines +120 to +133
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift

Add an agentic integration/e2e test for this feature path.

This new unit test is useful, but guideline-mandated feature coverage also requires a real agent flow (agent.start() + LLM response), not only store-level tests.

As per coding guidelines, "Real agentic tests are MANDATORY for every feature: Agent must call agent.start() with a real prompt, call the LLM, and produce actual text response—not just smoke tests of object construction."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/praisonai-agents/tests/unit/session/test_hierarchy.py` around lines 120 -
133, The new unit test (test_get_extended_session_sees_writes_from_other_store)
only covers store-level behavior; add a complementary agentic integration/e2e
test that exercises the full agent flow by creating two HierarchicalSessionStore
instances (writer/reader) on the same session_dir, starting a real agent via
agent.start() with a real prompt that triggers a write to the session (so the
writer persists a message), invoking the reader's
get_extended_session/_load_extended_session to confirm the agent-produced
message is visible (asserting session.messages contains the LLM response text),
and ensure the test uses a real LLM client (not mocked) per guidelines and
cleans up the tempfile directory.

def test_stale_cache_write_preserves_concurrent_updates(self):
"""Metadata writes must not truncate messages written by other processes."""
with tempfile.TemporaryDirectory() as tmpdir:
writer = HierarchicalSessionStore(session_dir=tmpdir)
reader = HierarchicalSessionStore(session_dir=tmpdir)

# Start fork operation in background thread
fork_thread = threading.Thread(target=concurrent_fork)
fork_thread.start()
# Create session and warm reader's cache
writer.create_session("session-1", title="Original")
reader.get_extended_session("session-1") # Warms cache

# Wait for fork to start, then add concurrent message
fork_started.wait()
store2.add_user_message(session_id, "concurrent_message")
# Writer adds messages, reader has stale cache
writer.add_user_message("session-1", "first message")
writer.add_assistant_message("session-1", "first response")

# Wait for fork to complete
fork_thread.join()
fork_completed.wait()
# Reader performs metadata-only write with stale cache
reader.set_title("session-1", "Updated Title")

# Both messages should be preserved
history = store1.get_chat_history(session_id)
assert len(history) == 2
assert any(msg["content"] == "concurrent_message" for msg in history)
# Verify messages are preserved
session = writer.get_extended_session("session-1")
assert session.title == "Updated Title"
assert len(session.messages) == 2
assert session.messages[0].content == "first message"
assert session.messages[1].content == "first response"

def test_update_session_metadata_preserves_extended_fields(self):
"""Metadata updates must not strip parent_id, snapshots, etc."""
Expand Down
Loading