-
Notifications
You must be signed in to change notification settings - Fork 37
feat(mcp): add streamable-http transport mode #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f18eb7b
42e5799
2ea6ca4
6f3fe08
62d66e2
3975012
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -319,8 +319,8 @@ def api(port: int, host: str, reload: bool, no_worker: bool, task_backend: str): | |
| @click.option( | ||
| "--mode", | ||
| default="stdio", | ||
| help="Run the MCP server in SSE or stdio mode", | ||
| type=click.Choice(["stdio", "sse"]), | ||
| help="Run the MCP server in SSE, streamable-http, or stdio mode", | ||
| type=click.Choice(["stdio", "sse", "streamable-http"]), | ||
| ) | ||
| @click.option( | ||
| "--task-backend", | ||
|
|
@@ -362,6 +362,9 @@ async def setup_and_run(): | |
| if mode == "sse": | ||
| logger.info(f"Starting MCP server on port {port}\n") | ||
| await mcp_app.run_sse_async() | ||
| elif mode == "streamable-http": | ||
| logger.info(f"Starting MCP server (streamable HTTP) on port {port}\n") | ||
| await mcp_app.run_streamable_http_async() | ||
|
Comment on lines
+365
to
+367
|
||
| elif mode == "stdio": | ||
| await mcp_app.run_stdio_async() | ||
| else: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,11 +5,11 @@ | |
| import ulid | ||
| from mcp.server.fastmcp import FastMCP as _FastMCPBase | ||
|
|
||
| from agent_memory_server import working_memory as working_memory_core | ||
| from agent_memory_server.api import ( | ||
| create_long_term_memory as core_create_long_term_memory, | ||
| delete_long_term_memory as core_delete_long_term_memory, | ||
| get_long_term_memory as core_get_long_term_memory, | ||
| get_working_memory as core_get_working_memory, | ||
| memory_prompt as core_memory_prompt, | ||
| put_working_memory_core as core_put_working_memory, | ||
| search_long_term_memory as core_search_long_term_memory, | ||
|
|
@@ -77,6 +77,7 @@ class FastMCP(_FastMCPBase): | |
| """Extend FastMCP to support optional URL namespace and default STDIO namespace.""" | ||
|
|
||
| def __init__(self, *args, default_namespace=None, **kwargs): | ||
| kwargs.setdefault("stateless_http", True) | ||
| super().__init__(*args, **kwargs) | ||
| self.default_namespace = default_namespace | ||
| self._current_request = None # Initialize the attribute | ||
|
|
@@ -85,11 +86,12 @@ def sse_app(self): | |
| from mcp.server.sse import SseServerTransport | ||
| from starlette.applications import Starlette | ||
| from starlette.requests import Request | ||
| from starlette.responses import Response | ||
| from starlette.routing import Mount, Route | ||
|
|
||
| sse = SseServerTransport(self.settings.message_path) | ||
|
|
||
| async def handle_sse(request: Request) -> None: | ||
| async def handle_sse(request: Request) -> Response: | ||
| # Store the request in the FastMCP instance so call_tool can access it | ||
| self._current_request = request | ||
|
|
||
|
|
@@ -103,10 +105,12 @@ async def handle_sse(request: Request) -> None: | |
| read_stream, | ||
| write_stream, | ||
| self._mcp_server.create_initialization_options(), | ||
| stateless=True, | ||
| ) | ||
| finally: | ||
| # Clean up request reference | ||
| self._current_request = None | ||
| return Response() | ||
|
|
||
| return Starlette( | ||
| debug=self.settings.debug, | ||
|
|
@@ -172,6 +176,13 @@ async def run_sse_async(self): | |
| uvicorn.Config(app, host="0.0.0.0", port=int(self.settings.port)) | ||
| ).serve() | ||
|
|
||
| async def run_streamable_http_async(self): | ||
| """Start streamable HTTP MCP server.""" | ||
| from agent_memory_server.utils.redis import get_redis_conn | ||
|
|
||
| await get_redis_conn() | ||
| return await super().run_streamable_http_async() | ||
|
Comment on lines
+179
to
+184
|
||
|
|
||
| async def run_stdio_async(self): | ||
| """Start STDIO MCP server.""" | ||
| from agent_memory_server.utils.redis import get_redis_conn | ||
|
|
@@ -188,6 +199,7 @@ async def run_stdio_async(self): | |
|
|
||
| mcp_app = FastMCP( | ||
| "Redis Agent Memory Server", | ||
| host=settings.mcp_host, | ||
| port=settings.mcp_port, | ||
| instructions=INSTRUCTIONS, | ||
| default_namespace=settings.default_mcp_namespace, | ||
|
|
@@ -899,9 +911,12 @@ async def get_working_memory( | |
| Returns: | ||
| Working memory containing messages, context, and structured memory records | ||
| """ | ||
| return await core_get_working_memory( | ||
| result = await working_memory_core.get_working_memory( | ||
| session_id=session_id, recent_messages_limit=recent_messages_limit | ||
| ) | ||
| if result is None: | ||
| return WorkingMemory(session_id=session_id, messages=[], memories=[]) | ||
| return result | ||
|
|
||
|
|
||
| @mcp_app.tool() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CLI help/choices are updated to include
streamable-http, but the repo documentation still describes MCP--modeas[stdio|sse](e.g.,docs/cli.md,docs/getting-started.md,docs/quick-start.md). Please update those docs (and any examples) to includestreamable-httpso users don’t follow outdated instructions.