From 4bc50a7d3834895eb398cf5f15daa75ea49794a9 Mon Sep 17 00:00:00 2001 From: Atosik Date: Sun, 22 Feb 2026 12:16:32 +0100 Subject: [PATCH 1/6] feat(mcp): add streamable-http transport mode MCP clients like Claude Code connect over HTTP but need the streamable-http transport -- not SSE or stdio. Without this mode, deploying agent-memory-server as a network service (e.g. in Kubernetes) requires a custom patch. Changes: - Add "streamable-http" as a --mode choice in the MCP CLI - Add run_streamable_http_async() with Redis connection init - Set stateless_http=True so clients that skip the MCP init handshake (common with subagent/tool-use patterns) still work - Fix SSE handler to return Response() instead of None, preventing TypeError when SSE clients disconnect - Pass stateless=True to _mcp_server.run() for SSE connections --- agent_memory_server/cli.py | 7 +++++-- agent_memory_server/mcp.py | 13 ++++++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/agent_memory_server/cli.py b/agent_memory_server/cli.py index 5be7ba4..648de72 100644 --- a/agent_memory_server/cli.py +++ b/agent_memory_server/cli.py @@ -319,8 +319,8 @@ def api(port: int, host: str, reload: bool, no_worker: bool, task_backend: str): @click.option( "--mode", default="stdio", - help="Run the MCP server in SSE or stdio mode", - type=click.Choice(["stdio", "sse"]), + help="Run the MCP server in SSE, streamable-http, or stdio mode", + type=click.Choice(["stdio", "sse", "streamable-http"]), ) @click.option( "--task-backend", @@ -362,6 +362,9 @@ async def setup_and_run(): if mode == "sse": logger.info(f"Starting MCP server on port {port}\n") await mcp_app.run_sse_async() + elif mode == "streamable-http": + logger.info(f"Starting MCP server (streamable HTTP) on port {port}\n") + await mcp_app.run_streamable_http_async() elif mode == "stdio": await mcp_app.run_stdio_async() else: diff --git a/agent_memory_server/mcp.py b/agent_memory_server/mcp.py index 1fce971..436afd4 100644 --- a/agent_memory_server/mcp.py +++ b/agent_memory_server/mcp.py @@ -80,6 +80,7 @@ class FastMCP(_FastMCPBase): """Extend FastMCP to support optional URL namespace and default STDIO namespace.""" def __init__(self, *args, default_namespace=None, **kwargs): + kwargs.setdefault("stateless_http", True) super().__init__(*args, **kwargs) self.default_namespace = default_namespace self._current_request = None # Initialize the attribute @@ -88,11 +89,12 @@ def sse_app(self): from mcp.server.sse import SseServerTransport from starlette.applications import Starlette from starlette.requests import Request + from starlette.responses import Response from starlette.routing import Mount, Route sse = SseServerTransport(self.settings.message_path) - async def handle_sse(request: Request) -> None: + async def handle_sse(request: Request): # Store the request in the FastMCP instance so call_tool can access it self._current_request = request @@ -106,10 +108,12 @@ async def handle_sse(request: Request) -> None: read_stream, write_stream, self._mcp_server.create_initialization_options(), + stateless=True, ) finally: # Clean up request reference self._current_request = None + return Response() return Starlette( debug=self.settings.debug, @@ -175,6 +179,13 @@ async def run_sse_async(self): uvicorn.Config(app, host="0.0.0.0", port=int(self.settings.port)) ).serve() + async def run_streamable_http_async(self): + """Start streamable HTTP MCP server.""" + from agent_memory_server.utils.redis import get_redis_conn + + await get_redis_conn() + return await super().run_streamable_http_async() + async def run_stdio_async(self): """Start STDIO MCP server.""" from agent_memory_server.utils.redis import get_redis_conn From 6b006d1e6c12b30499008e7853b8f95ad999d8de Mon Sep 17 00:00:00 2001 From: Atosik Date: Sun, 22 Feb 2026 12:26:02 +0100 Subject: [PATCH 2/6] fix: address review comments - Add return type hint `-> Response` to handle_sse - Update docs/cli.md with streamable-http mode and usage example - Add CLI tests for streamable-http: basic mode, asyncio default, docket backend (matching existing sse/stdio test patterns) --- agent_memory_server/mcp.py | 2 +- docs/cli.md | 7 +++-- tests/test_cli.py | 56 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/agent_memory_server/mcp.py b/agent_memory_server/mcp.py index 436afd4..76b23d1 100644 --- a/agent_memory_server/mcp.py +++ b/agent_memory_server/mcp.py @@ -94,7 +94,7 @@ def sse_app(self): sse = SseServerTransport(self.settings.message_path) - async def handle_sse(request: Request): + async def handle_sse(request: Request) -> Response: # Store the request in the FastMCP instance so call_tool can access it self._current_request = request diff --git a/docs/cli.md b/docs/cli.md index 962213e..50c6029 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -60,7 +60,7 @@ agent-memory mcp [OPTIONS] **Options:** - `--port INTEGER`: Port to run the MCP server on. (Default: value from `settings.mcp_port`, usually 9000) -- `--mode [stdio|sse]`: Run the MCP server in stdio or SSE mode. (Default: stdio) +- `--mode [stdio|sse|streamable-http]`: Run the MCP server in stdio, SSE, or streamable-http mode. (Default: stdio) - `--task-backend [asyncio|docket]`: Background task backend. `asyncio` (default) runs tasks inline in the MCP process with no separate worker. `docket` sends tasks to a Docket queue, which requires running `agent-memory task-worker`. **Examples:** @@ -72,11 +72,14 @@ agent-memory mcp # SSE mode for development (no separate worker needed) agent-memory mcp --mode sse --port 9001 +# Streamable HTTP mode for network deployments (e.g. Kubernetes) +agent-memory mcp --mode streamable-http --port 9000 + # SSE mode for production (requires separate worker process) agent-memory mcp --mode sse --port 9001 --task-backend docket ``` -**Note:** Stdio mode is designed for tools like Claude Desktop and, by default, uses the asyncio backend (no worker). Use `--task-backend docket` if you want MCP to enqueue background work into a shared Docket worker. +**Note:** Stdio mode is designed for tools like Claude Desktop and, by default, uses the asyncio backend (no worker). Streamable HTTP mode is suited for deploying the MCP server as a network service where HTTP clients (like Claude Code) connect over the network. Use `--task-backend docket` if you want MCP to enqueue background work into a shared Docket worker. ### `schedule-task` diff --git a/tests/test_cli.py b/tests/test_cli.py index 5c9ea48..689b6e7 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -284,6 +284,62 @@ def test_mcp_command_sse_mode(self, mock_mcp_app, mock_settings): assert result.exit_code == 0 mock_mcp_app.run_sse_async.assert_called_once() + @patch("agent_memory_server.cli.settings") + @patch("agent_memory_server.mcp.mcp_app") + def test_mcp_command_streamable_http_mode(self, mock_mcp_app, mock_settings): + """Test mcp command in streamable-http mode.""" + mock_settings.mcp_port = 3001 + + mock_mcp_app.run_streamable_http_async = AsyncMock() + + runner = CliRunner() + result = runner.invoke(mcp, ["--mode", "streamable-http", "--port", "9000"]) + + assert result.exit_code == 0 + mock_mcp_app.run_streamable_http_async.assert_called_once() + + @patch("agent_memory_server.cli.configure_logging") + @patch("agent_memory_server.mcp.mcp_app") + def test_mcp_command_streamable_http_mode_uses_asyncio_by_default( + self, mock_mcp_app, mock_configure_logging + ): + """Test that streamable-http mode uses asyncio backend by default.""" + from agent_memory_server.config import settings + + # Set initial state + settings.use_docket = True + + mock_mcp_app.run_streamable_http_async = AsyncMock() + + runner = CliRunner() + result = runner.invoke(mcp, ["--mode", "streamable-http"]) + + assert result.exit_code == 0 + assert settings.use_docket is False + mock_mcp_app.run_streamable_http_async.assert_called_once() + + @patch("agent_memory_server.cli.configure_logging") + @patch("agent_memory_server.mcp.mcp_app") + def test_mcp_command_streamable_http_mode_with_task_backend_docket( + self, mock_mcp_app, mock_configure_logging + ): + """Test that streamable-http mode with --task-backend=docket sets use_docket=True.""" + from agent_memory_server.config import settings + + # Set initial state + settings.use_docket = False + + mock_mcp_app.run_streamable_http_async = AsyncMock() + + runner = CliRunner() + result = runner.invoke( + mcp, ["--mode", "streamable-http", "--task-backend", "docket"] + ) + + assert result.exit_code == 0 + assert settings.use_docket is True + mock_mcp_app.run_streamable_http_async.assert_called_once() + @patch("agent_memory_server.cli.configure_mcp_logging") @patch("agent_memory_server.cli.settings") @patch("agent_memory_server.mcp.mcp_app") From 9d32fc4816e5f4f7efb01d1c5d3512b25a903558 Mon Sep 17 00:00:00 2001 From: Piotr Zaniewski Date: Sun, 22 Feb 2026 15:26:55 +0100 Subject: [PATCH 3/6] fix(mcp): return empty working memory instead of none for missing sessions core_get_working_memory returns None when no session exists. the mcp tool declares -> WorkingMemory return type, so the mcp sdk tries pydantic model_validate(None) which fails with "expected string or bytes-like object, got 'Header'" on streamable-http transport. --- agent_memory_server/mcp.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/agent_memory_server/mcp.py b/agent_memory_server/mcp.py index 76b23d1..d7de9f0 100644 --- a/agent_memory_server/mcp.py +++ b/agent_memory_server/mcp.py @@ -917,9 +917,12 @@ async def get_working_memory( Returns: Working memory containing messages, context, and structured memory records """ - return await core_get_working_memory( + result = await core_get_working_memory( session_id=session_id, recent_messages_limit=recent_messages_limit ) + if result is None: + return WorkingMemory(session_id=session_id, messages=[], memories=[]) + return result @mcp_app.tool() From 985e244895a8bdfd6348840c64226ad5f46287c8 Mon Sep 17 00:00:00 2001 From: Piotr Zaniewski Date: Sun, 22 Feb 2026 15:37:20 +0100 Subject: [PATCH 4/6] fix(mcp): return empty working memory instead of none for missing sessions the mcp get_working_memory handler imported the fastapi route handler from api.py which has a starlette Header() default parameter for x_client_version. when called from mcp (not http), the Header object is passed as-is to re.match() causing "expected string or bytes-like object, got 'Header'". fix: call working_memory.get_working_memory (the core function) directly instead of the api route handler, and guard the None return with an empty WorkingMemory. --- agent_memory_server/config.py | 1 + agent_memory_server/mcp.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/agent_memory_server/config.py b/agent_memory_server/config.py index 002eae4..edd65f3 100644 --- a/agent_memory_server/config.py +++ b/agent_memory_server/config.py @@ -358,6 +358,7 @@ class Settings(BaseSettings): "gpt-5-mini" # Faster, smaller model for quick tasks like query optimization ) port: int = 8000 + mcp_host: str = "0.0.0.0" mcp_port: int = 9000 # Memory vector database factory configuration diff --git a/agent_memory_server/mcp.py b/agent_memory_server/mcp.py index d7de9f0..5ddca26 100644 --- a/agent_memory_server/mcp.py +++ b/agent_memory_server/mcp.py @@ -9,12 +9,12 @@ create_long_term_memory as core_create_long_term_memory, delete_long_term_memory as core_delete_long_term_memory, get_long_term_memory as core_get_long_term_memory, - get_working_memory as core_get_working_memory, memory_prompt as core_memory_prompt, put_working_memory_core as core_put_working_memory, search_long_term_memory as core_search_long_term_memory, update_long_term_memory as core_update_long_term_memory, ) +from agent_memory_server import working_memory as working_memory_core from agent_memory_server.config import settings from agent_memory_server.dependencies import get_background_tasks from agent_memory_server.filters import ( @@ -202,6 +202,7 @@ async def run_stdio_async(self): mcp_app = FastMCP( "Redis Agent Memory Server", + host=settings.mcp_host, port=settings.mcp_port, instructions=INSTRUCTIONS, default_namespace=settings.default_mcp_namespace, @@ -917,7 +918,7 @@ async def get_working_memory( Returns: Working memory containing messages, context, and structured memory records """ - result = await core_get_working_memory( + result = await working_memory_core.get_working_memory( session_id=session_id, recent_messages_limit=recent_messages_limit ) if result is None: From 1d240a2910206ecc08bdbb5efa6fa63c422809b0 Mon Sep 17 00:00:00 2001 From: Piotr Zaniewski Date: Tue, 24 Feb 2026 09:40:14 +0100 Subject: [PATCH 5/6] fix: sort imports in mcp.py to satisfy ruff isort rules bare package import (from agent_memory_server import ...) must come before submodule imports (from agent_memory_server.api import ...). this was causing CI lint and test failures on PR #157. --- agent_memory_server/mcp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent_memory_server/mcp.py b/agent_memory_server/mcp.py index 5ddca26..5e9f84d 100644 --- a/agent_memory_server/mcp.py +++ b/agent_memory_server/mcp.py @@ -5,6 +5,7 @@ import ulid from mcp.server.fastmcp import FastMCP as _FastMCPBase +from agent_memory_server import working_memory as working_memory_core from agent_memory_server.api import ( create_long_term_memory as core_create_long_term_memory, delete_long_term_memory as core_delete_long_term_memory, @@ -14,7 +15,6 @@ search_long_term_memory as core_search_long_term_memory, update_long_term_memory as core_update_long_term_memory, ) -from agent_memory_server import working_memory as working_memory_core from agent_memory_server.config import settings from agent_memory_server.dependencies import get_background_tasks from agent_memory_server.filters import ( From d1c9d478317a7e75064692b2e54d49136d37d34d Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Wed, 25 Feb 2026 10:35:50 -0700 Subject: [PATCH 6/6] fix(mcp): scope stateless flags to streamable-http transport only `stateless_http=True` was set in `FastMCP.__init__`, which applied it to all transport modes (stdio, SSE, streamable-http). Moved it to `run_streamable_http_async()` so it only takes effect for that transport. Also removed `stateless=True` from the SSE handler's `_mcp_server.run()` call, which was unintentionally changing SSE behavior. Co-Authored-By: Claude Opus 4.6 --- agent_memory_server/mcp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent_memory_server/mcp.py b/agent_memory_server/mcp.py index 5e9f84d..610de76 100644 --- a/agent_memory_server/mcp.py +++ b/agent_memory_server/mcp.py @@ -80,7 +80,6 @@ class FastMCP(_FastMCPBase): """Extend FastMCP to support optional URL namespace and default STDIO namespace.""" def __init__(self, *args, default_namespace=None, **kwargs): - kwargs.setdefault("stateless_http", True) super().__init__(*args, **kwargs) self.default_namespace = default_namespace self._current_request = None # Initialize the attribute @@ -108,7 +107,6 @@ async def handle_sse(request: Request) -> Response: read_stream, write_stream, self._mcp_server.create_initialization_options(), - stateless=True, ) finally: # Clean up request reference @@ -184,6 +182,8 @@ async def run_streamable_http_async(self): from agent_memory_server.utils.redis import get_redis_conn await get_redis_conn() + # Enable stateless mode only for streamable-http transport + self.settings.stateless_http = True return await super().run_streamable_http_async() async def run_stdio_async(self):