Skip to content

feat(memory): Add persistent user memory system to IIAgent#193

Open
namtranii wants to merge 1 commit into
Intelligent-Internet:mainfrom
namtranii:feat/memory
Open

feat(memory): Add persistent user memory system to IIAgent#193
namtranii wants to merge 1 commit into
Intelligent-Internet:mainfrom
namtranii:feat/memory

Conversation

@namtranii
Copy link
Copy Markdown

Memory Feature Architecture

Persistent user memory — the agent remembers facts, preferences, and context across conversations.

1. Problem

Without memory, every conversation starts from zero. The agent cannot recall a user's name, role, preferences, or past decisions. Users must repeat context every session.

Memory solves this by:

  • Auto-extracting facts from conversations (background, no user action needed)
  • Injecting relevant memories into the system prompt each message
  • Providing an agentic tool so the agent can deliberately save/update/delete memories
  • Giving users a dashboard to view, edit, and control their memories

2. System Design

2.1 Architecture Overview

                    +-----------------+
                    |   Frontend UI   |
                    | (memory-table)  |
                    +--------+--------+
                             |
                    REST API (/v1/memories)
                             |
                    +--------v--------+
                    | MemoryRouter    |  ← FastAPI, auth, pagination
                    +--------+--------+
                             |
                    +--------v--------+
                    | MemoryService   |  ← Business logic + cache
                    +--------+--------+
                        |         |
               +--------v--+  +---v-----------+
               | MemoryRepo|  |MemoryCacheSvc |
               | (Postgres)|  |  (Redis)      |
               +-----------+  +---------------+

    ============= AGENT RUNTIME (separate path) ==============

    User message
        |
    +---v--------------+
    | IIAgent._arun()  |
    +---+---------+----+
        |         |
        |    (background task)
        |         |
   +----v----+  +-v-----------------+
   |aget_    |  |_amake_memories()  |
   |system_  |  |                   |
   |message()|  | MemoryManager     |
   +----+----+  |  .acreate_user_   |
        |       |   memories()      |
   +----v----+  +---+---+-----------+
   |_abuild_ |      |   |
   |memory_  |      |   +---> LLM call (tool-calling)
   |context()|      |              |
   +----+----+  +---v--------+    |
        |       |aget_user_  |    +---> add_memory()    --+
        +------>|memories()  |    +---> update_memory()   |--> DB write
                +---+--------+    +---> delete_memory()  --+    + evict cache
                    |
              +-----v------+
              | Redis cache |  ← hot path: 1 GET, 0 DB on hit
              +-----+------+
                    |miss
              +-----v------+
              | PostgreSQL  |  ← cold path: 1 query, then cache 600s
              +------------+

2.2 Two Execution Contexts

The memory system operates in two distinct contexts that share one cache:

Context Class DB access Cache access When
Request scope MemoryService db: AsyncSession from FastAPI DI Via injected MemoryCacheService singleton REST API calls
Agent runtime MemoryManager get_db_session_local() (own sessions) Via get_app_container().memory_service._cache (same singleton) Every agent message

This dual-context design is intentional: the agent runs outside the HTTP request lifecycle (Socket.IO + background tasks), but must share the same cache to avoid stale reads.

2.3 Domain Structure

src/ii_agent/memory/
    __init__.py          # Public API exports
    models.py            # UserMemory ORM model (Base, UUID PK)
    schemas.py           # MemoryData, MemoryResponse, request/response DTOs
    types.py             # MemorySortField, MemorySortOrder enums
    exceptions.py        # MemoryNotFoundError
    repository.py        # MemoryRepository(BaseRepository) — CRUD, pagination, topics
    service.py           # MemoryService — business logic + cache-aside reads/writes
    cache_service.py     # MemoryCacheService — Redis cache wrapper (5 key families)
    manager.py           # MemoryManager — LLM-based extraction, agentic search
    dependencies.py      # FastAPI Dep aliases (MemoryRepositoryDep, MemoryServiceDep)
    router.py            # REST API (/v1/memories)
    strategies/
        __init__.py
        base.py          # MemoryOptimizationStrategy ABC
        summarize.py     # SummarizeStrategy — compress N memories into 1

3. Data Flow

3.1 Agent Message — Read Path (hot path, every message)

1. IIAgent._arun_stream()
2.   → aget_system_message()
3.     → _abuild_memory_context(user_id)
4.       → MemoryManager.aget_user_memories(user_id)
5.         → Redis GET "memory:list:{uid}"
6.           HIT  → return list[dict] directly (0 DB, 0 Pydantic)
7.           MISS → DB SELECT → build dicts via MemoryData.model_dump(mode="json")
8.                → Redis SET "memory:list:{uid}" TTL 600s
9.                → return list[dict]
10.      → format as XML: <memories_from_previous_interactions>
11.      → append to system message

Design decision: On cache hit (step 6), no DB session is acquired and no Pydantic deserialization occurs. This is critical because this path runs on every single message.

3.2 Agent Message — Write Path (background, every message)

1. IIAgent._arun_stream()  (after model response)
2.   → asyncio.create_task(_amake_memories())
3.     → MemoryManager.acreate_user_memories(message, user_id)
4.       → aget_user_memories(user_id)         # cache HIT (already warm from read path)
5.       → _acreate_or_update_memories(...)
6.         → build tool closures (add_memory, update_memory, ...)
7.         → model.aresponse(messages, tools)   # secondary LLM call
8.           → LLM decides: "add_memory('User is a Python developer', topics=['occupation'])"
9.             → DB INSERT via MemoryRepository.upsert()
10.            → _evict_memory_cache(user_id)
11.              → container.memory_service._cache.evict_user_memory_data(uid)
12.                → Redis DEL "memory:list:{uid}"
13.                → Redis DEL "memory:topics:{uid}"
14.                → Redis DEL "memory:pver:{uid}"

Design decision: Write path runs as a background asyncio.Task. The agent response is not blocked by memory extraction. Cache eviction happens after each tool call, not batched, to minimize stale windows.

3.3 Dashboard API — Read Path

GET /v1/memories?page=1&per_page=10&sort_by=updated_at&sort_order=desc

1. MemoryRouter.list_memories()
2.   → MemoryService.get_memories_paginated(db, user_id, ...)
3.     → compute params_hash = md5(page:per_page:search:topics:sort:order)[:12]
4.     → MemoryCacheService.get_paginated_memories(uid, params_hash)
5.       → Redis GET "memory:pver:{uid}" → version "1743638400000"
6.       → Redis GET "memory:page:{uid}:1743638400000:{params_hash}"
7.         HIT  → deserialize → return
8.         MISS → DB query (LIMIT/OFFSET + filters) → Redis SET TTL 300s → return

Design decision: Paginated results use version-based cache invalidation. When memories change, the version key (pver:{uid}) is deleted. Old page keys become orphaned and expire via TTL (300s). This avoids scanning for all page keys on eviction.

3.4 User Preferences — Toggle Memory On/Off

PATCH /auth/me/preferences  {"has_memory": false}

1. Update user_metadata JSONB in users table
2. Redis DEL "memory:prefs:{uid}"
3. Next agent run:
   → load_user_memory_preferences() → cache MISS → DB read → {"has_memory": false}
   → tool_args = {"has_memory": false, ...}
   → AgentFactory: has_memory=false → memory_manager=None
   → Agent runs WITHOUT memory (no context injection, no auto-extraction)

4. Cache Design

4.1 Key Schema

All keys are namespaced by EntityCache._make_key() → final Redis key = memory:{key}.

Key Data TTL Set by Evicted by
list:{uid} list[dict] — all memories, full schema 600s Service (miss), Manager (miss) Any memory write
topics:{uid} list[str] — distinct topic tags 600s Service (miss) Any memory write
pver:{uid} str — timestamp version 600s CacheService (auto-create) Any memory write
page:{uid}:{ver}:{hash} {"memories": [...], "total": int} 300s Service (miss) Orphaned on pver evict → TTL decay
prefs:{uid} {"has_memory": bool} 3600s Service (miss) PATCH /auth/me/preferences

4.2 Cache Instance Singleton

ApplicationContainer.init()
  └── memory_cache_backend = get_entity_cache(redis_client, namespace="memory")  → instance A
  └── memory_cache_svc = MemoryCacheService(cache=A)                             → instance B
  └── memory_svc = MemoryService(cache=B)                                        → instance C
  └── container.memory_service = C

All access paths resolve to the SAME instance:
  MemoryService._cache           → B → A
  MemoryManager (via container)  → container.C._cache → B → A
  _evict_memory_cache()          → container.C._cache → B → A
  users/router.py                → container.C._cache → B → A

4.3 Format Compatibility

Both MemoryService and MemoryManager can SET the list:{uid} cache key. They MUST use identical serialization:

# Both use:
MemoryData(...).model_dump(mode="json")

# Produces:
{
    "memory_id": "abc",
    "memory": "User is a Python developer",
    "topics": ["occupation"],
    "user_id": "12345678-...",       # UUID as string
    "input": "I'm a Python dev",
    "agent_id": "agent-1",
    "created_at": "2026-01-01T00:00:00Z",  # ISO 8601 with Z
    "updated_at": "2026-01-02T00:00:00Z",
}

MemoryService reads back via MemoryData.model_validate(d) → Pydantic parses strings back to UUID/datetime.
MemoryManager reads back as raw dicts → accesses d["memory_id"], d["memory"] directly.

5. Agent Integration

5.1 IIAgent Fields

@dataclass
class IIAgent:
    # Memory fields (added to existing agent)
    memory_manager: Optional[MemoryManager] = None
    enable_agentic_memory: bool = False       # gives agent update_user_memory tool
    update_memory_on_run: bool = False         # auto-extract after each message
    add_memories_to_context: bool = False      # inject into system prompt

5.2 Factory Wiring

# AgentFactory.create_agent()
has_memory = tool_args.get("has_memory", True)          # from user preferences
has_agentic_memory_tool = tool_args.get("agentic_memory", False)

memory_manager = MemoryManager(model=model) if (has_memory or has_agentic_memory_tool) else None

agent = IIAgent(
    memory_manager=memory_manager,
    update_memory_on_run=has_memory,
    enable_agentic_memory=has_agentic_memory_tool,
    add_memories_to_context=has_memory or has_agentic_memory_tool,
)

5.3 Run Lifecycle Integration

_arun_stream():
  ├── aget_system_message()             # injects memories if add_memories_to_context
  ├── model.aresponse()                 # main LLM call (may call update_user_memory tool)
  ├── create_task(_amake_memories())    # background memory extraction
  ├── post-hooks
  ├── await memory_task                 # wait for background extraction
  └── RunCompletedEvent

6. Database

6.1 Table: user_memories

CREATE TABLE user_memories (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    memory_id   VARCHAR NOT NULL UNIQUE,     -- logical ID for upsert
    user_id     UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    agent_id    VARCHAR,                     -- which agent created it
    memory      VARCHAR NOT NULL,            -- the memory text
    topics      JSONB,                       -- ["occupation", "preferences"]
    input       VARCHAR,                     -- original user message
    version     BIGINT NOT NULL DEFAULT 0,   -- optimistic locking
    created_at  TIMESTAMPTZ DEFAULT now(),
    updated_at  TIMESTAMPTZ DEFAULT now()
);

-- Indexes
CREATE INDEX ix_user_memories_user_id ON user_memories(user_id);
CREATE INDEX ix_user_memories_user_agent ON user_memories(user_id, agent_id);
CREATE UNIQUE INDEX ix_user_memories_memory_id ON user_memories(memory_id);
CREATE INDEX ix_user_memories_updated_at ON user_memories(updated_at);

6.2 User Preferences

Stored in users.metadata JSONB (no schema migration needed):

{
    "preferences": {
        "has_memory": true
    }
}

Exposed via User.preferences property and UserPublic.preferences schema.

7. API Endpoints

Method Path Auth Description
GET /v1/memories JWT List with pagination, search, topic filter, sort
GET /v1/memories/topics JWT Distinct topic tags for filter dropdown
GET /v1/memories/{memory_id} JWT Single memory
POST /v1/memories JWT Create memory
PUT /v1/memories/{memory_id} JWT Update memory
DELETE /v1/memories/{memory_id} JWT Delete memory
POST /v1/memories/bulk-delete JWT Batch delete
PATCH /auth/me/preferences JWT Toggle has_memory

8. Chat Mode Integration

Chat mode (/v1/chat) uses a completely different execution path from agent mode (Socket.IO). Memory is integrated via helper functions in chat_service.py.

8.1 Architecture Difference

Aspect Agent mode Chat mode
Entry point Socket.IO query handler REST POST /v1/chat/messages
System prompt Built in IIAgent.aget_system_message() Hardcoded in each LLM provider (Anthropic/OpenAI/Gemini)
Memory read _abuild_memory_context() → appends to system message _load_memory_context_for_chat() → prepends system-role Message to context
Memory write _amake_memories() via asyncio.Task after model response _extract_memories_for_chat() via asyncio.Task after LLM loop completes
LLM for extraction Same model as agent (from factory) Same model as chat (from model_config)

8.2 Read Path (Chat Mode)

ChatService.stream_chat_response()
  → ContextWindowManager.load_context_for_llm()    # load message history
  → _load_memory_context_for_chat(user_id, container)
    → Check has_memory preference (cached)
    → Redis GET "memory:list:{uid}"
      HIT  → build Message(role=SYSTEM, parts=[TextContent(memory XML)])
      MISS → DB query → cache → build Message
    → messages.insert(0, memory_msg)               # prepend to context
  → LLM provider picks up system-role message as part of system prompt

The system-role message is handled differently by each provider:

  • Anthropic: convert_to_anthropic_messages() extracts system-role messages and merges into system param
  • OpenAI: System messages are passed inline
  • Gemini: System messages become system_instruction

8.3 Write Path (Chat Mode)

ChatService.stream_chat_response()
  → LLM turn loop completes
  → asyncio.create_task(_extract_memories_for_chat(...))
    → Check has_memory preference
    → Convert ModelConfig → LLMConfig
    → MemoryManager(model=get_model(llm_config)).acreate_user_memories(message, user_id)
      → (same flow as agent mode: LLM tool-calling → DB write → cache evict)

8.4 Council Mode

Council mode (stream_council_chat_response) also has memory integration:

  • Read: Same _load_memory_context_for_chat() prepended to messages (shared by all council models)
  • Write: Background extraction uses the first council model's config

9. Files Modified (Integration Points)

File Change
agents/agent.py Memory fields, _abuild_memory_context, _amake_memories, agentic tool, run lifecycle
agents/factory/agent.py Read has_memory/agentic_memory from tool_args, create MemoryManager
agents/runs/agent.py MemoryUpdateStartedEvent, MemoryUpdateCompletedEvent (already existed)
agents/runs/events.py create_memory_update_started/completed_event (already existed)
chat/application/chat_service.py _load_memory_context_for_chat, _extract_memories_for_chat, inject into stream_chat_response + stream_council_chat_response
users/models.py User.preferences property
users/schemas.py UserPreferences model, field on UserPublic
users/router.py PATCH /auth/me/preferences endpoint
core/container.py Wire MemoryRepository, MemoryCacheService, MemoryService
app/routers.py Register memory router at /v1/memories
realtime/handlers/query.py Merge user memory preferences into tool_args
frontend/src/services/memory.service.ts API calls to /v1/memories

@PhungVanDuy PhungVanDuy requested a review from khoangothe April 6, 2026 08:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant