Skip to content

Commit 3e4ac86

Browse files
committed
remove adk LivenessProbe and update mcpSessionManager
1 parent 0c23467 commit 3e4ac86

3 files changed

Lines changed: 141 additions & 21 deletions

File tree

go/internal/controller/translator/agent/adk_api_translator.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -524,15 +524,6 @@ func (a *adkApiTranslator) buildManifest(
524524
TimeoutSeconds: 15,
525525
PeriodSeconds: 15,
526526
},
527-
LivenessProbe: &corev1.Probe{
528-
ProbeHandler: corev1.ProbeHandler{
529-
HTTPGet: &corev1.HTTPGetAction{Path: "/healthz/mcp", Port: intstr.FromString("http")},
530-
},
531-
InitialDelaySeconds: 30,
532-
PeriodSeconds: 30,
533-
TimeoutSeconds: 10,
534-
FailureThreshold: 3,
535-
},
536527
SecurityContext: securityContext,
537528
VolumeMounts: volumeMounts,
538529
}},

python/packages/kagent-adk/src/kagent/adk/_mcp_toolset.py

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
logger = logging.getLogger("kagent_adk." + __name__)
1414

1515
_PING_TIMEOUT_SECONDS = 2.0
16+
_SESSION_REVALIDATE_TIMEOUT_SECONDS = 5.0
1617
_JSONRPC_METHOD_NOT_FOUND = -32601
1718

1819

@@ -28,8 +29,17 @@ def _is_server_alive_error(exc: Exception) -> bool:
2829
return False
2930

3031

32+
def _is_session_invalid_error(exc: Exception) -> bool:
33+
"""Return True if the error indicates the MCP session is no longer valid (e.g. 404)."""
34+
parts = [str(exc)]
35+
if isinstance(exc, McpError) and exc.error.message:
36+
parts.append(exc.error.message)
37+
msg = " ".join(parts).lower()
38+
return "404" in msg or "session terminated" in msg
39+
40+
3141
class KAgentMCPSessionManager(MCPSessionManager):
32-
"""Session manager that validates cached sessions via ping before reuse.
42+
"""Session manager that validates cached sessions via ping and list_tools before reuse.
3343
3444
The upstream ``MCPSessionManager`` checks ``_read_stream._closed`` /
3545
``_write_stream._closed`` to decide whether a cached session is still
@@ -38,26 +48,42 @@ class KAgentMCPSessionManager(MCPSessionManager):
3848
stale ``mcp-session-id`` is sent to the new server, which replies
3949
with HTTP 404 → ``"Session terminated"``.
4050
41-
This subclass adds a lightweight ``send_ping()`` probe after the
42-
parent returns a cached session. If the ping fails the cached
43-
session is torn down and a brand-new one is created transparently.
51+
This subclass: (1) runs ``send_ping()`` after the parent returns a cached
52+
session; (2) then revalidates the session with ``list_tools()`` so that
53+
if the server restarted and the session id is invalid (404), we prune
54+
the session from the cache and create a new one.
4455
"""
4556

4657
async def create_session(self, headers: dict[str, str] | None = None) -> ClientSession:
4758
session = await super().create_session(headers)
4859

4960
try:
5061
await asyncio.wait_for(session.send_ping(), timeout=_PING_TIMEOUT_SECONDS)
51-
return session
5262
except Exception as exc:
5363
if _is_server_alive_error(exc):
54-
return session
55-
logger.warning("MCP session failed ping validation, invalidating cached session and creating a fresh one")
56-
try:
57-
await self.close()
58-
except Exception as close_exc:
59-
logger.debug("Non-fatal error while closing stale session: %s", close_exc)
60-
return await super().create_session(headers)
64+
pass
65+
else:
66+
logger.warning(
67+
"MCP session failed ping validation, invalidating cached session and creating a fresh one"
68+
)
69+
try:
70+
await self.close()
71+
except Exception as close_exc:
72+
logger.debug("Non-fatal error while closing stale session: %s", close_exc)
73+
return await super().create_session(headers)
74+
75+
try:
76+
await asyncio.wait_for(session.list_tools(), timeout=_SESSION_REVALIDATE_TIMEOUT_SECONDS)
77+
return session
78+
except Exception as exc:
79+
if _is_session_invalid_error(exc):
80+
logger.warning("MCP session invalid (e.g. 404), pruning from cache and creating a fresh one")
81+
try:
82+
await self.close()
83+
except Exception as close_exc:
84+
logger.debug("Non-fatal error while closing stale session: %s", close_exc)
85+
return await super().create_session(headers)
86+
raise
6187

6288

6389
def _enrich_cancelled_error(error: BaseException) -> asyncio.CancelledError:

python/packages/kagent-adk/tests/unittests/test_mcp_session_recovery.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ async def test_create_session_returns_session_when_ping_succeeds():
2626

2727
mock_session = AsyncMock()
2828
mock_session.send_ping = AsyncMock(return_value=None)
29+
mock_session.list_tools = AsyncMock(return_value=[])
2930

3031
with patch.object(
3132
KAgentMCPSessionManager.__bases__[0],
@@ -37,6 +38,7 @@ async def test_create_session_returns_session_when_ping_succeeds():
3738

3839
assert result is mock_session
3940
mock_session.send_ping.assert_awaited_once()
41+
mock_session.list_tools.assert_awaited_once()
4042
parent_create.assert_awaited_once()
4143

4244

@@ -49,6 +51,7 @@ async def test_create_session_invalidates_and_retries_when_ping_fails():
4951

5052
fresh_session = AsyncMock()
5153
fresh_session.send_ping = AsyncMock(return_value=None)
54+
fresh_session.list_tools = AsyncMock(return_value=[])
5255

5356
call_count = 0
5457

@@ -100,6 +103,7 @@ async def _slow_ping():
100103

101104
fresh_session = AsyncMock()
102105
fresh_session.send_ping = AsyncMock(return_value=None)
106+
fresh_session.list_tools = AsyncMock(return_value=[])
103107

104108
call_count = 0
105109

@@ -133,6 +137,7 @@ async def test_create_session_accepts_method_not_found_as_alive():
133137
mock_session.send_ping = AsyncMock(
134138
side_effect=McpError(error=ErrorData(code=-32601, message="Method not found: ping"))
135139
)
140+
mock_session.list_tools = AsyncMock(return_value=[])
136141

137142
with patch.object(
138143
KAgentMCPSessionManager.__bases__[0],
@@ -143,9 +148,105 @@ async def test_create_session_accepts_method_not_found_as_alive():
143148
result = await mgr.create_session()
144149

145150
assert result is mock_session
151+
mock_session.list_tools.assert_awaited_once()
146152
parent_create.assert_awaited_once()
147153

148154

155+
@pytest.mark.asyncio
156+
async def test_create_session_invalidates_when_list_tools_returns_session_terminated():
157+
"""After ping passes, list_tools is used to revalidate; 404/session terminated → prune and recreate."""
158+
mgr = _make_manager()
159+
160+
stale_session = AsyncMock()
161+
stale_session.send_ping = AsyncMock(return_value=None)
162+
stale_session.list_tools = AsyncMock(side_effect=Exception("Session terminated"))
163+
164+
fresh_session = AsyncMock()
165+
fresh_session.send_ping = AsyncMock(return_value=None)
166+
fresh_session.list_tools = AsyncMock(return_value=[])
167+
168+
call_count = 0
169+
170+
async def _parent_create(headers=None):
171+
nonlocal call_count
172+
call_count += 1
173+
if call_count == 1:
174+
return stale_session
175+
return fresh_session
176+
177+
with (
178+
patch.object(
179+
KAgentMCPSessionManager.__bases__[0],
180+
"create_session",
181+
side_effect=_parent_create,
182+
),
183+
patch.object(mgr, "close", new_callable=AsyncMock) as mock_close,
184+
):
185+
result = await mgr.create_session()
186+
187+
assert result is fresh_session
188+
mock_close.assert_awaited_once()
189+
assert call_count == 2
190+
stale_session.list_tools.assert_awaited_once()
191+
192+
193+
@pytest.mark.asyncio
194+
async def test_create_session_invalidates_when_list_tools_returns_404():
195+
"""list_tools returning 404 (session invalid) triggers prune and recreate."""
196+
mgr = _make_manager()
197+
198+
stale_session = AsyncMock()
199+
stale_session.send_ping = AsyncMock(return_value=None)
200+
stale_session.list_tools = AsyncMock(side_effect=McpError(error=ErrorData(code=-32000, message="404 Not Found")))
201+
202+
fresh_session = AsyncMock()
203+
fresh_session.send_ping = AsyncMock(return_value=None)
204+
fresh_session.list_tools = AsyncMock(return_value=[])
205+
206+
call_count = 0
207+
208+
async def _parent_create(headers=None):
209+
nonlocal call_count
210+
call_count += 1
211+
if call_count == 1:
212+
return stale_session
213+
return fresh_session
214+
215+
with (
216+
patch.object(
217+
KAgentMCPSessionManager.__bases__[0],
218+
"create_session",
219+
side_effect=_parent_create,
220+
),
221+
patch.object(mgr, "close", new_callable=AsyncMock) as mock_close,
222+
):
223+
result = await mgr.create_session()
224+
225+
assert result is fresh_session
226+
mock_close.assert_awaited_once()
227+
assert call_count == 2
228+
stale_session.list_tools.assert_awaited_once()
229+
230+
231+
@pytest.mark.asyncio
232+
async def test_create_session_propagates_non_session_error_from_list_tools():
233+
"""Transient errors (e.g. timeout) from list_tools are propagated, not treated as session invalid."""
234+
mgr = _make_manager()
235+
236+
mock_session = AsyncMock()
237+
mock_session.send_ping = AsyncMock(return_value=None)
238+
mock_session.list_tools = AsyncMock(side_effect=asyncio.TimeoutError())
239+
240+
with patch.object(
241+
KAgentMCPSessionManager.__bases__[0],
242+
"create_session",
243+
new_callable=AsyncMock,
244+
return_value=mock_session,
245+
):
246+
with pytest.raises(asyncio.TimeoutError):
247+
await mgr.create_session()
248+
249+
149250
@pytest.mark.asyncio
150251
async def test_create_session_recovers_even_when_close_raises():
151252
"""Recovery must succeed even if close() raises during stale session teardown."""
@@ -155,6 +256,8 @@ async def test_create_session_recovers_even_when_close_raises():
155256
stale_session.send_ping = AsyncMock(side_effect=Exception("Session terminated"))
156257

157258
fresh_session = AsyncMock()
259+
fresh_session.send_ping = AsyncMock(return_value=None)
260+
fresh_session.list_tools = AsyncMock(return_value=[])
158261

159262
call_count = 0
160263

0 commit comments

Comments
 (0)