@@ -50,6 +50,16 @@ class MCPMarkAgent(BaseMCPAgent):
5050 "You are a helpful agent that uses tools iteratively to complete the user's task, "
5151 "and when finished, provides the final answer or simply states \" Task completed\" without further tool calls."
5252 )
53+ COMPACTION_PROMPT = (
54+ "You are performing a CONTEXT CHECKPOINT COMPACTION.\n "
55+ "Summarize the conversation so far for another model to continue.\n \n "
56+ "Include:\n "
57+ "- Current progress and key decisions made\n "
58+ "- Important context, constraints, or user preferences\n "
59+ "- What remains to be done (clear next steps)\n "
60+ "- Any critical data, examples, or references needed to continue\n \n "
61+ "Be concise and structured. Do NOT call tools."
62+ )
5363 DEFAULT_TIMEOUT = BaseMCPAgent .DEFAULT_TIMEOUT
5464
5565 def __init__ (
@@ -62,6 +72,7 @@ def __init__(
6272 service_config : Optional [Dict [str , Any ]] = None ,
6373 service_config_provider : Optional [Callable [[], Dict [str , Any ]]] = None ,
6474 reasoning_effort : Optional [str ] = "default" ,
75+ compaction_token : int = BaseMCPAgent .COMPACTION_DISABLED_TOKEN ,
6576 ):
6677 super ().__init__ (
6778 litellm_input_model_name = litellm_input_model_name ,
@@ -72,6 +83,7 @@ def __init__(
7283 service_config = service_config ,
7384 service_config_provider = service_config_provider ,
7485 reasoning_effort = reasoning_effort ,
86+ compaction_token = compaction_token ,
7587 )
7688 logger .debug (
7789 "Initialized MCPMarkAgent for '%s' with model '%s' (Claude: %s, Thinking: %s, Reasoning: %s)" ,
@@ -303,6 +315,177 @@ async def _call_claude_native_api(
303315 return None , e .response .text
304316 except Exception as e :
305317 return None , e
318+
319+ async def _count_claude_input_tokens (
320+ self ,
321+ messages : List [Dict [str , Any ]],
322+ tools : Optional [List [Dict ]] = None ,
323+ system : Optional [str ] = None ,
324+ ) -> int :
325+ import os
326+
327+ api_base = os .getenv ("ANTHROPIC_API_BASE" , "https://api.anthropic.com" )
328+ headers = {
329+ "x-api-key" : self .api_key ,
330+ "anthropic-version" : "2023-06-01" ,
331+ "content-type" : "application/json" ,
332+ }
333+ payload : Dict [str , Any ] = {
334+ "model" : self .litellm_input_model_name .replace ("anthropic/" , "" ),
335+ "messages" : messages ,
336+ }
337+ if tools :
338+ payload ["tools" ] = tools
339+ if system :
340+ payload ["system" ] = system
341+
342+ async with httpx .AsyncClient () as client :
343+ response = await client .post (
344+ f"{ api_base } /v1/messages/count_tokens" ,
345+ headers = headers ,
346+ json = payload ,
347+ timeout = self .timeout ,
348+ )
349+ response .raise_for_status ()
350+ data = response .json () or {}
351+ return int (data .get ("input_tokens" , 0 ) or 0 )
352+
353+ def _extract_litellm_text (self , response : Any ) -> str :
354+ try :
355+ choices = getattr (response , "choices" , None ) or []
356+ if not choices :
357+ return ""
358+ msg = getattr (choices [0 ], "message" , None )
359+ if msg is not None :
360+ return str (getattr (msg , "content" , "" ) or "" )
361+ return str (getattr (choices [0 ], "text" , "" ) or "" )
362+ except Exception : # pragma: no cover - best effort
363+ return ""
364+
365+ def _extract_anthropic_text (self , response_json : Dict [str , Any ]) -> str :
366+ pieces : List [str ] = []
367+ for block in response_json .get ("content" , []) or []:
368+ if isinstance (block , dict ) and block .get ("type" ) == "text" :
369+ text = block .get ("text" )
370+ if text :
371+ pieces .append (str (text ))
372+ return "\n " .join (pieces ).strip ()
373+
374+ def _merge_usage (self , total_tokens : Dict [str , int ], usage : Dict [str , Any ]) -> None :
375+ try :
376+ input_tokens = int (usage .get ("input_tokens" , 0 ) or 0 )
377+ output_tokens = int (usage .get ("output_tokens" , 0 ) or 0 )
378+ total_tokens_count = int (usage .get ("total_tokens" , 0 ) or (input_tokens + output_tokens ))
379+ total_tokens ["input_tokens" ] += input_tokens
380+ total_tokens ["output_tokens" ] += output_tokens
381+ total_tokens ["total_tokens" ] += total_tokens_count
382+ except Exception : # pragma: no cover - best effort
383+ return
384+
385+ async def _maybe_compact_litellm_messages (
386+ self ,
387+ messages : List [Dict [str , Any ]],
388+ total_tokens : Dict [str , int ],
389+ tool_call_log_file : Optional [str ],
390+ current_prompt_tokens : int ,
391+ ) -> List [Dict [str , Any ]]:
392+ if not self ._compaction_enabled ():
393+ return messages
394+ if current_prompt_tokens < self .compaction_token :
395+ return messages
396+
397+ logger .info (f"| [compaction] Triggered at prompt tokens: { current_prompt_tokens :,} " )
398+ if tool_call_log_file :
399+ try :
400+ with open (tool_call_log_file , "a" , encoding = "utf-8" ) as f :
401+ f .write (f"| [compaction] Triggered at prompt tokens: { current_prompt_tokens :,} \n " )
402+ except Exception :
403+ pass
404+
405+ compact_messages = [
406+ {"role" : "system" , "content" : self .COMPACTION_PROMPT },
407+ {"role" : "user" , "content" : json .dumps (messages , ensure_ascii = False )},
408+ ]
409+ completion_kwargs = {
410+ "model" : self .litellm_input_model_name ,
411+ "messages" : compact_messages ,
412+ "api_key" : self .api_key ,
413+ }
414+ if self .base_url :
415+ completion_kwargs ["base_url" ] = self .base_url
416+ response = await litellm .acompletion (** completion_kwargs )
417+
418+ usage = getattr (response , "usage" , None )
419+ if usage :
420+ input_tokens = getattr (usage , "prompt_tokens" , None ) or getattr (usage , "input_tokens" , None ) or 0
421+ output_tokens = getattr (usage , "completion_tokens" , None ) or getattr (usage , "output_tokens" , None ) or 0
422+ total_tokens_count = getattr (usage , "total_tokens" , None )
423+ if total_tokens_count is None :
424+ total_tokens_count = input_tokens + output_tokens
425+ total_tokens ["input_tokens" ] += int (input_tokens or 0 )
426+ total_tokens ["output_tokens" ] += int (output_tokens or 0 )
427+ total_tokens ["total_tokens" ] += int (total_tokens_count or 0 )
428+
429+ summary = self ._extract_litellm_text (response ).strip () or "(no summary)"
430+ system_msg = messages [0 ] if messages else {"role" : "system" , "content" : self .SYSTEM_PROMPT }
431+ first_user = messages [1 ] if len (messages ) > 1 else {"role" : "user" , "content" : "" }
432+ return [
433+ system_msg ,
434+ first_user ,
435+ {
436+ "role" : "user" ,
437+ "content" : f"Context summary (auto-compacted due to token limit):\n { summary } " ,
438+ },
439+ ]
440+
441+ async def _maybe_compact_anthropic_messages (
442+ self ,
443+ messages : List [Dict [str , Any ]],
444+ total_tokens : Dict [str , int ],
445+ thinking_budget : int ,
446+ tool_call_log_file : Optional [str ],
447+ current_input_tokens : int ,
448+ ) -> List [Dict [str , Any ]]:
449+ if not self ._compaction_enabled ():
450+ return messages
451+ if current_input_tokens < self .compaction_token :
452+ return messages
453+
454+ logger .info (f"| [compaction] Triggered at input tokens: { current_input_tokens :,} " )
455+ if tool_call_log_file :
456+ try :
457+ with open (tool_call_log_file , "a" , encoding = "utf-8" ) as f :
458+ f .write (f"| [compaction] Triggered at input tokens: { current_input_tokens :,} \n " )
459+ except Exception :
460+ pass
461+
462+ compact_messages = [
463+ {"role" : "user" , "content" : self .COMPACTION_PROMPT },
464+ {"role" : "user" , "content" : json .dumps (messages , ensure_ascii = False )},
465+ ]
466+ response , error_msg = await self ._call_claude_native_api (
467+ messages = compact_messages ,
468+ thinking_budget = thinking_budget ,
469+ tools = None ,
470+ system = None ,
471+ )
472+ if error_msg or not response :
473+ logger .warning (f"| [compaction] Failed: { error_msg } " )
474+ return messages
475+
476+ usage = response .get ("usage" , {}) or {}
477+ input_tokens = usage .get ("input_tokens" , 0 ) or 0
478+ output_tokens = usage .get ("output_tokens" , 0 ) or 0
479+ total_tokens ["input_tokens" ] += int (input_tokens )
480+ total_tokens ["output_tokens" ] += int (output_tokens )
481+ total_tokens ["total_tokens" ] += int (input_tokens + output_tokens )
482+
483+ summary = self ._extract_anthropic_text (response ) or "(no summary)"
484+ first_user = messages [0 ] if messages else {"role" : "user" , "content" : "" }
485+ return [
486+ first_user ,
487+ {"role" : "user" , "content" : f"Context summary (auto-compacted due to token limit):\n { summary } " },
488+ ]
306489
307490
308491 async def _execute_anthropic_native_tool_loop (
@@ -327,9 +510,29 @@ async def _execute_anthropic_native_tool_loop(
327510 system_text = self .SYSTEM_PROMPT
328511 # Record initial state
329512 self ._update_progress (messages , total_tokens , turn_count )
330-
513+
331514 for _ in range (max_turns ):
332515 turn_count += 1
516+
517+ current_input_tokens = 0
518+ if self ._compaction_enabled ():
519+ try :
520+ current_input_tokens = await self ._count_claude_input_tokens (
521+ messages = messages ,
522+ tools = tools ,
523+ system = system_text ,
524+ )
525+ except Exception as exc : # noqa: BLE001
526+ logger .debug ("Claude token counting failed: %s" , exc )
527+
528+ messages = await self ._maybe_compact_anthropic_messages (
529+ messages = messages ,
530+ total_tokens = total_tokens ,
531+ thinking_budget = thinking_budget ,
532+ tool_call_log_file = tool_call_log_file ,
533+ current_input_tokens = current_input_tokens ,
534+ )
535+ self ._update_progress (messages , total_tokens , turn_count )
333536
334537 # Call Claude native API
335538 response , error_msg = await self ._call_claude_native_api (
@@ -584,9 +787,20 @@ async def _execute_litellm_tool_loop(
584787
585788 # Record initial state
586789 self ._update_progress (messages , total_tokens , turn_count )
587-
790+
588791 try :
589792 while turn_count < max_turns :
793+ current_prompt_tokens = 0
794+ if self ._compaction_enabled ():
795+ current_prompt_tokens = self ._count_prompt_tokens_litellm (messages )
796+
797+ messages = await self ._maybe_compact_litellm_messages (
798+ messages = messages ,
799+ total_tokens = total_tokens ,
800+ tool_call_log_file = tool_call_log_file ,
801+ current_prompt_tokens = current_prompt_tokens ,
802+ )
803+ self ._update_progress (messages , total_tokens , turn_count )
590804
591805 # Build completion kwargs
592806 completion_kwargs = {
@@ -626,7 +840,15 @@ async def _execute_litellm_tool_loop(
626840 if consecutive_failures >= max_consecutive_failures :
627841 raise
628842 if "ContextWindowExceededError" in str (e ):
629- raise
843+ # Best-effort fallback: compact and retry once.
844+ messages = await self ._maybe_compact_litellm_messages (
845+ messages = messages ,
846+ total_tokens = total_tokens ,
847+ tool_call_log_file = tool_call_log_file ,
848+ current_prompt_tokens = self .compaction_token ,
849+ )
850+ self ._update_progress (messages , total_tokens , turn_count )
851+ continue
630852 elif "RateLimitError" in str (e ):
631853 await asyncio .sleep (12 ** consecutive_failures )
632854 else :
@@ -794,12 +1016,6 @@ async def _execute_litellm_tool_loop(
7941016 }
7951017
7961018
797-
798- # ==================== Format Conversion Methods ====================
799-
800-
801-
802-
8031019 # ==================== MCP Server Management ====================
8041020
8051021 async def _create_mcp_server (self ) -> Any :
0 commit comments