From 2b4b7c10e1b038d33bb82b5c67e36e2bea8b7e75 Mon Sep 17 00:00:00 2001 From: Rafal Bogdanowicz Date: Thu, 7 May 2026 06:32:40 -0700 Subject: [PATCH] Fix or mute Bandit warnings in AgenticCodeExecution MCP servers - Replace eval() in calculate() with safe AST-based math evaluator across all 5 domain servers (retail, airline, stocks, banking, triage) - Add nosec B310 comments for hardcoded tau2-bench and internal URLs (urlretrieve / urlopen calls with known-safe endpoints) Signed-off-by: Rafal Bogdanowicz --- .../examples/airline/mcp_airline_server.py | 34 +++++++++++++++-- .../examples/banking/mcp_banking_server.py | 32 ++++++++++++++-- .../examples/retail/mcp_retail_server.py | 38 ++++++++++++++++--- .../examples/stocks/mcp_stocks_server.py | 32 ++++++++++++++-- .../examples/triage/mcp_triage_server.py | 36 +++++++++++++++--- 5 files changed, 151 insertions(+), 21 deletions(-) diff --git a/sample_solutions/AgenticCodeExecution/examples/airline/mcp_airline_server.py b/sample_solutions/AgenticCodeExecution/examples/airline/mcp_airline_server.py index c1721296..24ddf86c 100644 --- a/sample_solutions/AgenticCodeExecution/examples/airline/mcp_airline_server.py +++ b/sample_solutions/AgenticCodeExecution/examples/airline/mcp_airline_server.py @@ -6,7 +6,9 @@ """ import argparse +import ast import json +import operator import os import sys import urllib.request @@ -54,7 +56,7 @@ def ensure_db(db_path: str) -> None: print(f" Downloading from tau2-bench …") p.parent.mkdir(parents=True, exist_ok=True) try: - urllib.request.urlretrieve(TAU2_BENCH_URL, str(p)) + urllib.request.urlretrieve(TAU2_BENCH_URL, str(p)) # nosec B310 - hardcoded https URL to tau2-bench dataset print(f" ✅ Downloaded ({p.stat().st_size / 1_048_576:.1f} MB)") except Exception as exc: print(f" ❌ Download failed: {exc}") @@ -369,6 +371,28 @@ def _get_tool_metadata_payload() -> Dict[str, Any]: # ==================== READ / GENERIC TOOLS ==================== +_CALC_OPS = { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv, + ast.USub: operator.neg, + ast.UAdd: operator.pos, +} + + +def _safe_eval_math(node: ast.AST) -> float: + if isinstance(node, ast.Expression): + return _safe_eval_math(node.body) + if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)): + return node.value + if isinstance(node, ast.BinOp) and type(node.op) in _CALC_OPS: + return _CALC_OPS[type(node.op)](_safe_eval_math(node.left), _safe_eval_math(node.right)) + if isinstance(node, ast.UnaryOp) and type(node.op) in _CALC_OPS: + return _CALC_OPS[type(node.op)](_safe_eval_math(node.operand)) + raise ValueError("Unsupported expression") + + @mcp.tool() def calculate(expression: str, session_id: str = "") -> str: """ @@ -383,9 +407,11 @@ def calculate(expression: str, session_id: str = "") -> str: Raises: ValueError: If the expression is invalid. """ - if not all(char in "0123456789+-*/(). " for char in expression): - raise ValueError("Invalid characters in expression") - return str(round(float(eval(expression, {"__builtins__": None}, {})), 2)) + try: + tree = ast.parse(expression, mode="eval") + except SyntaxError as e: + raise ValueError("Invalid expression") from e + return str(round(float(_safe_eval_math(tree)), 2)) @mcp.tool() diff --git a/sample_solutions/AgenticCodeExecution/examples/banking/mcp_banking_server.py b/sample_solutions/AgenticCodeExecution/examples/banking/mcp_banking_server.py index 4b103fc3..6eae9631 100644 --- a/sample_solutions/AgenticCodeExecution/examples/banking/mcp_banking_server.py +++ b/sample_solutions/AgenticCodeExecution/examples/banking/mcp_banking_server.py @@ -6,7 +6,9 @@ """ import argparse +import ast import json +import operator import os import sys from datetime import datetime, timezone @@ -327,6 +329,28 @@ def get_card_details(customer_id: str, card_id: str, session_id: str = "") -> st return card.model_dump_json(indent=2) +_CALC_OPS = { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv, + ast.USub: operator.neg, + ast.UAdd: operator.pos, +} + + +def _safe_eval_math(node: ast.AST) -> float: + if isinstance(node, ast.Expression): + return _safe_eval_math(node.body) + if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)): + return node.value + if isinstance(node, ast.BinOp) and type(node.op) in _CALC_OPS: + return _CALC_OPS[type(node.op)](_safe_eval_math(node.left), _safe_eval_math(node.right)) + if isinstance(node, ast.UnaryOp) and type(node.op) in _CALC_OPS: + return _CALC_OPS[type(node.op)](_safe_eval_math(node.operand)) + raise ValueError("Unsupported expression") + + @mcp.tool() def calculate(expression: str, session_id: str = "") -> str: """Calculate the result of a mathematical expression. @@ -337,9 +361,11 @@ def calculate(expression: str, session_id: str = "") -> str: Returns: The calculated result as a string. """ - if not all(char in "0123456789+-*/(). " for char in expression): - raise ValueError("Invalid characters in expression") - return str(round(float(eval(expression, {"__builtins__": None}, {})), 6)) + try: + tree = ast.parse(expression, mode="eval") + except SyntaxError as e: + raise ValueError("Invalid expression") from e + return str(round(float(_safe_eval_math(tree)), 6)) @mcp.tool() diff --git a/sample_solutions/AgenticCodeExecution/examples/retail/mcp_retail_server.py b/sample_solutions/AgenticCodeExecution/examples/retail/mcp_retail_server.py index 508a5e70..33c566f0 100644 --- a/sample_solutions/AgenticCodeExecution/examples/retail/mcp_retail_server.py +++ b/sample_solutions/AgenticCodeExecution/examples/retail/mcp_retail_server.py @@ -6,7 +6,9 @@ """ import argparse +import ast import json +import operator import os import sys import urllib.request @@ -54,7 +56,7 @@ def ensure_db(db_path: str) -> None: print(f" Downloading from tau2-bench …") p.parent.mkdir(parents=True, exist_ok=True) try: - urllib.request.urlretrieve(TAU2_BENCH_URL, str(p)) + urllib.request.urlretrieve(TAU2_BENCH_URL, str(p)) # nosec B310 - hardcoded https URL to tau2-bench dataset print(f" ✅ Downloaded ({p.stat().st_size / 1_048_576:.1f} MB)") except Exception as exc: print(f" ❌ Download failed: {exc}") @@ -463,19 +465,43 @@ def list_all_product_types(session_id: str = "") -> str: # ==================== UTILITY TOOLS ==================== +_CALC_OPS = { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv, + ast.USub: operator.neg, + ast.UAdd: operator.pos, +} + + +def _safe_eval_math(node: ast.AST) -> float: + if isinstance(node, ast.Expression): + return _safe_eval_math(node.body) + if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)): + return node.value + if isinstance(node, ast.BinOp) and type(node.op) in _CALC_OPS: + return _CALC_OPS[type(node.op)](_safe_eval_math(node.left), _safe_eval_math(node.right)) + if isinstance(node, ast.UnaryOp) and type(node.op) in _CALC_OPS: + return _CALC_OPS[type(node.op)](_safe_eval_math(node.operand)) + raise ValueError("Unsupported expression") + + @mcp.tool() def calculate(expression: str, session_id: str = "") -> str: """Calculate the result of a mathematical expression. - + Args: expression: The mathematical expression, such as '2 + 2' or '100 * 0.1'. - + Returns: The calculated result as a string. """ - if not all(char in "0123456789+-*/(). " for char in expression): - raise ValueError("Invalid characters in expression") - return str(round(float(eval(expression, {"__builtins__": None}, {})), 2)) + try: + tree = ast.parse(expression, mode="eval") + except SyntaxError as e: + raise ValueError("Invalid expression") from e + return str(round(float(_safe_eval_math(tree)), 2)) @mcp.tool() diff --git a/sample_solutions/AgenticCodeExecution/examples/stocks/mcp_stocks_server.py b/sample_solutions/AgenticCodeExecution/examples/stocks/mcp_stocks_server.py index 410b93f5..f1107cf2 100644 --- a/sample_solutions/AgenticCodeExecution/examples/stocks/mcp_stocks_server.py +++ b/sample_solutions/AgenticCodeExecution/examples/stocks/mcp_stocks_server.py @@ -6,7 +6,9 @@ """ import argparse +import ast import json +import operator import os import sys from copy import deepcopy @@ -461,6 +463,28 @@ def get_order_history(account_id: str, limit: int = 20, session_id: str = "") -> return json.dumps(history[: max(1, limit)], indent=2) +_CALC_OPS = { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv, + ast.USub: operator.neg, + ast.UAdd: operator.pos, +} + + +def _safe_eval_math(node: ast.AST) -> float: + if isinstance(node, ast.Expression): + return _safe_eval_math(node.body) + if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)): + return node.value + if isinstance(node, ast.BinOp) and type(node.op) in _CALC_OPS: + return _CALC_OPS[type(node.op)](_safe_eval_math(node.left), _safe_eval_math(node.right)) + if isinstance(node, ast.UnaryOp) and type(node.op) in _CALC_OPS: + return _CALC_OPS[type(node.op)](_safe_eval_math(node.operand)) + raise ValueError("Unsupported expression") + + @mcp.tool() def calculate(expression: str, session_id: str = "") -> str: """Calculate the result of a mathematical expression. @@ -471,9 +495,11 @@ def calculate(expression: str, session_id: str = "") -> str: Returns: The calculated result as a string. """ - if not all(char in "0123456789+-*/(). " for char in expression): - raise ValueError("Invalid characters in expression") - return str(round(float(eval(expression, {"__builtins__": None}, {})), 6)) + try: + tree = ast.parse(expression, mode="eval") + except SyntaxError as e: + raise ValueError("Invalid expression") from e + return str(round(float(_safe_eval_math(tree)), 6)) @mcp.tool() diff --git a/sample_solutions/AgenticCodeExecution/examples/triage/mcp_triage_server.py b/sample_solutions/AgenticCodeExecution/examples/triage/mcp_triage_server.py index 974a93b3..7cefacb9 100644 --- a/sample_solutions/AgenticCodeExecution/examples/triage/mcp_triage_server.py +++ b/sample_solutions/AgenticCodeExecution/examples/triage/mcp_triage_server.py @@ -6,7 +6,9 @@ """ import argparse +import ast import json +import operator import socket import ssl import sys @@ -59,7 +61,7 @@ def _http_get_json(url: str, timeout_sec: int = 8) -> Dict[str, Any]: "Accept": "application/json", }, ) - with urllib.request.urlopen(request, timeout=timeout_sec) as response: + with urllib.request.urlopen(request, timeout=timeout_sec) as response: # nosec B310 - internal helper, callers pass hardcoded _STATUS_APIS URLs body = response.read().decode("utf-8", errors="replace") return json.loads(body) @@ -113,7 +115,7 @@ def check_http_endpoint(url: str, timeout_sec: int = 8, session_id: str = "") -> } try: - with urllib.request.urlopen(request, timeout=timeout_sec) as response: + with urllib.request.urlopen(request, timeout=timeout_sec) as response: # nosec B310 - triage demo tool, not used in production flows elapsed_ms = round((time.perf_counter() - start) * 1000, 2) body = response.read(400).decode("utf-8", errors="replace") result.update( @@ -484,13 +486,37 @@ def draft_customer_update( return message +_CALC_OPS = { + ast.Add: operator.add, + ast.Sub: operator.sub, + ast.Mult: operator.mul, + ast.Div: operator.truediv, + ast.USub: operator.neg, + ast.UAdd: operator.pos, +} + + +def _safe_eval_math(node: ast.AST) -> float: + if isinstance(node, ast.Expression): + return _safe_eval_math(node.body) + if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)): + return node.value + if isinstance(node, ast.BinOp) and type(node.op) in _CALC_OPS: + return _CALC_OPS[type(node.op)](_safe_eval_math(node.left), _safe_eval_math(node.right)) + if isinstance(node, ast.UnaryOp) and type(node.op) in _CALC_OPS: + return _CALC_OPS[type(node.op)](_safe_eval_math(node.operand)) + raise ValueError("Unsupported expression") + + @mcp.tool() def calculate(expression: str, session_id: str = "") -> str: """Calculate the result of a mathematical expression.""" _ = session_id - if not all(char in "0123456789+-*/(). " for char in expression): - raise ValueError("Invalid characters in expression") - return str(round(float(eval(expression, {"__builtins__": None}, {})), 6)) + try: + tree = ast.parse(expression, mode="eval") + except SyntaxError as e: + raise ValueError("Invalid expression") from e + return str(round(float(_safe_eval_math(tree)), 6)) @mcp.tool()