Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions examples/eval/reliability_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,19 @@ def search_web(query: str) -> str:


def calculate(expression: str) -> str:
"""Calculate a math expression."""
return str(eval(expression))
"""Calculate a math expression safely."""
import ast, operator
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
Comment on lines +21 to +24
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Allowing the exponentiation operator (ast.Pow / **) in the safe evaluator introduces a Denial of Service (DoS) vulnerability. An attacker can supply an expression with nested exponentiation (e.g., 9**9**9**9 or 10**1000000000) that consumes excessive CPU and memory, potentially hanging or crashing the server.

Since this is a simple calculator tool, it is safest to remove ast.Pow from the allowed operators.

Suggested change
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod,
ast.USub: operator.neg, ast.UAdd: operator.pos}

def _ev(n):
if isinstance(n, ast.Expression): return _ev(n.body)
if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
raise ValueError(f"Unsupported: {ast.dump(n)}")
Comment on lines +25 to +30
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Split the one-line evaluator branches.

Ruff is already flagging Lines 26-29 with E701, so this copy will fail the configured lint check until each branch is expanded onto its own lines.

Suggested cleanup
     def _ev(n):
-        if isinstance(n, ast.Expression): return _ev(n.body)
-        if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
-        if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
-        if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
+        if isinstance(n, ast.Expression):
+            return _ev(n.body)
+        if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)):
+            return n.value
+        if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS:
+            return _OPS[type(n.op)](_ev(n.operand))
+        if isinstance(n, ast.BinOp) and type(n.op) in _OPS:
+            return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
         raise ValueError(f"Unsupported: {ast.dump(n)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _ev(n):
if isinstance(n, ast.Expression): return _ev(n.body)
if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
raise ValueError(f"Unsupported: {ast.dump(n)}")
def _ev(n):
if isinstance(n, ast.Expression):
return _ev(n.body)
if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)):
return n.value
if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS:
return _OPS[type(n.op)](_ev(n.operand))
if isinstance(n, ast.BinOp) and type(n.op) in _OPS:
return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
raise ValueError(f"Unsupported: {ast.dump(n)}")
🧰 Tools
🪛 Ruff (0.15.15)

[error] 26-26: Multiple statements on one line (colon)

(E701)


[error] 27-27: Multiple statements on one line (colon)

(E701)


[error] 28-28: Multiple statements on one line (colon)

(E701)


[error] 29-29: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/eval/reliability_example.py` around lines 25 - 30, The helper
function _ev currently uses multiple single-line if/return branches which
trigger E701; expand each conditional into its own multi-line block inside _ev
(e.g., convert "if isinstance(n, ast.Expression): return _ev(n.body)" into a
standard two-line if with the return on the next line) for all branches handling
ast.Expression, ast.Constant, ast.UnaryOp, and ast.BinOp (and keep the final
raise ValueError as the fallback). Preserve the same logic and the use of _OPS
for operator handling while only changing the formatting to satisfy the linter.

return str(_ev(ast.parse(expression, mode="eval")))


# Check if we have an API key
Expand Down
39 changes: 31 additions & 8 deletions examples/mcp/multi_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,43 @@

def calculate(expression: str) -> str:
"""
A simple calculator tool that evaluates mathematical expressions.
A simple calculator tool that evaluates mathematical expressions safely.

Args:
expression: A mathematical expression to evaluate (e.g., "2 + 2")

Returns:
The result of the calculation
"""
import ast
import operator

_OPS = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod,
ast.Pow: operator.pow,
ast.USub: operator.neg,
ast.UAdd: operator.pos,
}
Comment on lines +33 to +43
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Allowing the exponentiation operator (ast.Pow / **) in the safe evaluator introduces a Denial of Service (DoS) vulnerability. An attacker can supply an expression with nested exponentiation (e.g., 9**9**9**9 or 10**1000000000) that consumes excessive CPU and memory, potentially hanging or crashing the server.

Since this is a simple calculator tool, it is safest to remove ast.Pow from the allowed operators.

    _OPS = {
        ast.Add: operator.add,
        ast.Sub: operator.sub,
        ast.Mult: operator.mul,
        ast.Div: operator.truediv,
        ast.FloorDiv: operator.floordiv,
        ast.Mod: operator.mod,
        ast.USub: operator.neg,
        ast.UAdd: operator.pos,
    }


def _safe_eval(node):
if isinstance(node, ast.Expression):
return _safe_eval(node.body)
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
return node.value
if isinstance(node, ast.UnaryOp) and type(node.op) in _OPS:
return _OPS[type(node.op)](_safe_eval(node.operand))
if isinstance(node, ast.BinOp) and type(node.op) in _OPS:
return _OPS[type(node.op)](_safe_eval(node.left), _safe_eval(node.right))
raise ValueError(f"Unsupported expression: {ast.dump(node)}")
Comment on lines +45 to +54
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In Python, is boola subclass ofint, and does isinstance(True, (int, float))returnTrue?

💡 Result:

Yes, in Python, bool is a subclass of int [1][2][3][4]. This relationship exists for backward compatibility, as boolean values were historically represented as 0 and 1 before the formal bool type was introduced [4][5]. Because bool is a subclass of int, the function isinstance(True, (int, float)) returns True [2][5]. This is because isinstance checks if the object is an instance of the specified class or any of its subclasses [6]. Since True is an instance of bool, and bool is a subclass of int, the check against int succeeds [2][4][5].

Citations:


Exclude bool from numeric literal allowlist in _safe_eval
Line 48 uses isinstance(node.value, (int, float)), which accepts True/False because bool is a subclass of int. Use an exact type check (type(node.value) in (int, float)), or add an explicit not isinstance(node.value, bool) guard.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/mcp/multi_server.py` around lines 45 - 54, In _safe_eval, the
numeric-literal check currently allows bools because isinstance(node.value,
(int, float)) accepts True/False; change the guard in the Constant branch to
reject booleans by using an exact type check or an explicit bool exclusion
(e.g., replace isinstance(node.value, (int, float)) with type(node.value) in
(int, float) or add "and not isinstance(node.value, bool)") so only real
ints/floats are evaluated; update the Constant branch in function _safe_eval
accordingly.


try:
# Safe evaluation of mathematical expressions
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
return "Error: Invalid characters in expression"
result = eval(expression)
tree = ast.parse(expression, mode="eval")
result = _safe_eval(tree)
return f"Result: {result}"
except Exception as e:
return f"Error: {str(e)}"
Expand Down
18 changes: 12 additions & 6 deletions examples/mcp_server/custom_tools_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@ def greet(name: str) -> str:
def calculate(expression: str) -> str:
"""Safely evaluate a mathematical expression."""
try:
# Only allow safe math operations
allowed = set("0123456789+-*/.(). ")
if not all(c in allowed for c in expression):
return "Error: Invalid characters in expression"
result = eval(expression) # Safe due to character filtering
return f"Result: {result}"
import ast, operator
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
Comment on lines +28 to +31
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Allowing the exponentiation operator (ast.Pow / **) in the safe evaluator introduces a Denial of Service (DoS) vulnerability. An attacker can supply an expression with nested exponentiation (e.g., 9**9**9**9 or 10**1000000000) that consumes excessive CPU and memory, potentially hanging or crashing the server.

Since this is a simple calculator tool, it is safest to remove ast.Pow from the allowed operators.

Suggested change
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod,
ast.USub: operator.neg, ast.UAdd: operator.pos}

def _ev(n):
if isinstance(n, ast.Expression): return _ev(n.body)
if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
raise ValueError(f"Unsupported: {ast.dump(n)}")
return f"Result: {_ev(ast.parse(expression, mode='eval'))}"
Comment on lines +32 to +38
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Expand the one-line if branches.

Ruff is already reporting E701 on Lines 33-36, so this block will keep failing lint until each branch is split onto its own lines.

🧰 Tools
🪛 Ruff (0.15.15)

[error] 33-33: Multiple statements on one line (colon)

(E701)


[error] 34-34: Multiple statements on one line (colon)

(E701)


[error] 35-35: Multiple statements on one line (colon)

(E701)


[error] 36-36: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/mcp_server/custom_tools_server.py` around lines 32 - 38, The nested
evaluator function _ev currently uses multiple single-line if statements causing
E701; expand each one-line branch into a proper multi-line if/elif/else block
inside the _ev function (handle ast.Expression -> return _ev(n.body),
ast.Constant numeric -> return n.value, ast.UnaryOp with operator in _OPS ->
return _OPS[type(n.op)](_ev(n.operand)), ast.BinOp with operator in _OPS ->
return _OPS[type(n.op)](_ev(n.left), _ev(n.right)), and the final raise
ValueError) so that every branch is on its own lines while keeping usage of _OPS
and ast.parse(expression, mode='eval') unchanged.

except Exception as e:
return f"Error: {e}"

Expand Down
31 changes: 29 additions & 2 deletions examples/python/a2a/a2a-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,36 @@ def search_web(query: str) -> str:
return f"Search results for: {query}"

def calculate(expression: str) -> str:
"""Calculate a mathematical expression."""
"""Calculate a mathematical expression safely."""
import ast
import operator

_OPS = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod,
ast.Pow: operator.pow,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Exponentiation operator allows computational DoS

ast.Pow is whitelisted without any guard on operand magnitude. An expression like 9**9**9 evaluates right-associatively as 9**(9**9) = 9**387420489, which Python will attempt to represent as an arbitrary-precision integer with roughly 370 million decimal digits (~1.5 GB of memory). Because operator.pow is called directly, there is no recursion depth limit or size check, so a single crafted request can OOM or stall any of the 9 server processes that include this evaluator (a2a-server.py, mcp-tools-server.py, mcp-tools-server-sse.py, multi_server.py, custom_tools_server.py, mcp_server_example.py, and others).

ast.USub: operator.neg,
ast.UAdd: operator.pos,
}
Comment on lines +28 to +38
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Allowing the exponentiation operator (ast.Pow / **) in the safe evaluator introduces a Denial of Service (DoS) vulnerability. An attacker can supply an expression with nested exponentiation (e.g., 9**9**9**9 or 10**1000000000) that consumes excessive CPU and memory, potentially hanging or crashing the server.

Since this is a simple calculator tool, it is safest to remove ast.Pow from the allowed operators.

    _OPS = {
        ast.Add: operator.add,
        ast.Sub: operator.sub,
        ast.Mult: operator.mul,
        ast.Div: operator.truediv,
        ast.FloorDiv: operator.floordiv,
        ast.Mod: operator.mod,
        ast.USub: operator.neg,
        ast.UAdd: operator.pos,
    }


def _eval(node):
if isinstance(node, ast.Expression):
return _eval(node.body)
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
return node.value
if isinstance(node, ast.UnaryOp) and type(node.op) in _OPS:
return _OPS[type(node.op)](_eval(node.operand))
if isinstance(node, ast.BinOp) and type(node.op) in _OPS:
return _OPS[type(node.op)](_eval(node.left), _eval(node.right))
raise ValueError(f"Unsupported expression: {ast.dump(node)}")

try:
return f"Result: {eval(expression)}"
tree = ast.parse(expression, mode="eval")
return f"Result: {_eval(tree)}"
Comment on lines +28 to +53
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Bound expensive arithmetic before exposing this over A2A.

This server is network-reachable, and the unrestricted ** support here still lets a caller burn CPU or memory with inputs like 9**9999999. Please either remove ast.Pow from the allowlist for the server-facing examples or enforce strict operand/result limits before evaluating it.

Minimal hardening if exponentiation is not required
     _OPS = {
         ast.Add: operator.add,
         ast.Sub: operator.sub,
         ast.Mult: operator.mul,
         ast.Div: operator.truediv,
         ast.FloorDiv: operator.floordiv,
         ast.Mod: operator.mod,
-        ast.Pow: operator.pow,
         ast.USub: operator.neg,
         ast.UAdd: operator.pos,
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
_OPS = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod,
ast.Pow: operator.pow,
ast.USub: operator.neg,
ast.UAdd: operator.pos,
}
def _eval(node):
if isinstance(node, ast.Expression):
return _eval(node.body)
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
return node.value
if isinstance(node, ast.UnaryOp) and type(node.op) in _OPS:
return _OPS[type(node.op)](_eval(node.operand))
if isinstance(node, ast.BinOp) and type(node.op) in _OPS:
return _OPS[type(node.op)](_eval(node.left), _eval(node.right))
raise ValueError(f"Unsupported expression: {ast.dump(node)}")
try:
return f"Result: {eval(expression)}"
tree = ast.parse(expression, mode="eval")
return f"Result: {_eval(tree)}"
_OPS = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod,
ast.USub: operator.neg,
ast.UAdd: operator.pos,
}
def _eval(node):
if isinstance(node, ast.Expression):
return _eval(node.body)
if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
return node.value
if isinstance(node, ast.UnaryOp) and type(node.op) in _OPS:
return _OPS[type(node.op)](_eval(node.operand))
if isinstance(node, ast.BinOp) and type(node.op) in _OPS:
return _OPS[type(node.op)](_eval(node.left), _eval(node.right))
raise ValueError(f"Unsupported expression: {ast.dump(node)}")
try:
tree = ast.parse(expression, mode="eval")
return f"Result: {_eval(tree)}"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/python/a2a/a2a-server.py` around lines 28 - 53, The eval allows
unbounded exponentiation via ast.Pow in the _OPS mapping and _eval, which can be
abused; either remove ast.Pow from _OPS (and any handling in _eval) to disable
** entirely for the server, or keep ast.Pow but add strict pre-checks inside
_eval before computing pow: validate operand types and enforce numeric bounds
(e.g., limit absolute base and exponent to small integers and/or limit exponent
>= 0 and <= a safe max like 10, and fail if result would exceed a safe size) and
catch/raise on violations; update references in the _OPS dict and the _eval
function to implement the chosen mitigation.

except Exception:
return "Invalid expression"

Expand Down
13 changes: 12 additions & 1 deletion examples/python/custom_tools/tools_with_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,18 @@ def calculate_with_tracking(expression: str) -> str:
time.sleep(0.05)

try:
result = eval(expression)
import ast, operator
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
Comment on lines +53 to +56
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Allowing the exponentiation operator (ast.Pow / **) in the safe evaluator introduces a Denial of Service (DoS) vulnerability. An attacker can supply an expression with nested exponentiation (e.g., 9**9**9**9 or 10**1000000000) that consumes excessive CPU and memory, potentially hanging or crashing the server.

Since this is a simple calculator tool, it is safest to remove ast.Pow from the allowed operators.

Suggested change
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod,
ast.USub: operator.neg, ast.UAdd: operator.pos}

def _ev(n):
if isinstance(n, ast.Expression): return _ev(n.body)
if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
raise ValueError(f"Unsupported: {ast.dump(n)}")
Comment on lines +57 to +62
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reject booleans explicitly and expand the one-line branches.

isinstance(n.value, (int, float)) accepts True/False because bool is a subclass of int, so this currently permits non-numeric literals despite the intended allowlist. Lines 58-61 also trip Ruff E701.

Suggested fix
         def _ev(n):
-            if isinstance(n, ast.Expression): return _ev(n.body)
-            if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
-            if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
-            if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
+            if isinstance(n, ast.Expression):
+                return _ev(n.body)
+            if isinstance(n, ast.Constant):
+                if type(n.value) not in (int, float):
+                    raise ValueError(f"Unsupported: {ast.dump(n)}")
+                return n.value
+            if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS:
+                return _OPS[type(n.op)](_ev(n.operand))
+            if isinstance(n, ast.BinOp) and type(n.op) in _OPS:
+                return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
             raise ValueError(f"Unsupported: {ast.dump(n)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _ev(n):
if isinstance(n, ast.Expression): return _ev(n.body)
if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
raise ValueError(f"Unsupported: {ast.dump(n)}")
def _ev(n):
if isinstance(n, ast.Expression):
return _ev(n.body)
if isinstance(n, ast.Constant):
if type(n.value) not in (int, float):
raise ValueError(f"Unsupported: {ast.dump(n)}")
return n.value
if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS:
return _OPS[type(n.op)](_ev(n.operand))
if isinstance(n, ast.BinOp) and type(n.op) in _OPS:
return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
raise ValueError(f"Unsupported: {ast.dump(n)}")
🧰 Tools
🪛 Ruff (0.15.15)

[error] 58-58: Multiple statements on one line (colon)

(E701)


[error] 59-59: Multiple statements on one line (colon)

(E701)


[error] 60-60: Multiple statements on one line (colon)

(E701)


[error] 61-61: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/python/custom_tools/tools_with_latency.py` around lines 57 - 62, The
helper function _ev currently treats booleans as numbers because
isinstance(n.value, (int, float)) accepts bool; update _ev to explicitly reject
bool (e.g., check type(n.value) is int or float or add "and not
isinstance(n.value, bool)") and expand the condensed one-line if-return branches
into multi-line blocks for readability and to satisfy Ruff E701; adjust the
branches handling ast.Expression, ast.Constant, ast.UnaryOp, and ast.BinOp
within _ev accordingly.

result = _ev(ast.parse(expression, mode="eval"))
Comment on lines 51 to +63
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show the relevant portion of the file with line numbers
FILE="examples/python/custom_tools/tools_with_latency.py"
nl -ba "$FILE" | sed -n '1,120p' > /tmp/ttls_top.txt
echo "=== $FILE (lines 1-120) ==="
cat /tmp/ttls_top.txt

# Extract calculate_with_tracking and any helper(s) for context
echo
echo "=== Search for calculate_with_tracking definition ==="
rg -n "def calculate_with_tracking|calculate_with_tracking" "$FILE"

# Look for expression validation or any MAX_* constants in the file
echo
echo "=== Search for expression bounds/validation patterns in the file ==="
rg -n "MAX_|len\(expression\)|expression.*(len|depth|pow|exp|bound)" "$FILE" || true

# Find other occurrences of Pow / AST evaluator in repo to see if this helper is reused
echo
echo "=== Search repo for the _ev(ast.parse(... mode=\"eval\")) pattern ==="
rg -n "_ev\\(ast\\.parse\\(expression, mode=\"eval\"\\)\\)|ast\\.Pow|Unsupported: \\{ast\\.dump\\(n\\)\\}" -S .

Repository: MervinPraison/PraisonAI

Length of output: 110


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="examples/python/custom_tools/tools_with_latency.py"

echo "=== Showing file with line numbers around the referenced range (40-100) ==="
cat -n "$FILE" | sed -n '40,110p'

echo
echo "=== Find calculate_with_tracking definition and usage in this file ==="
rg -n "def calculate_with_tracking|calculate_with_tracking\\(" "$FILE" || true

echo
echo "=== Find the AST evaluator helper(s) inside the tool ==="
rg -n "_ev\\b|ast\\.parse\\(expression, mode=\"eval\"\\)|Unsupported: \\{ast\\.dump\\(n\\)\\}|_OPS\\b|ast\\.Pow" "$FILE" || true

echo
echo "=== Check for any existing complexity/length bounds in this file ==="
rg -n "MAX_|min\\(|max\\(|len\\(expression\\)|depth|recurs|exponent|pow\\b.*(bound|limit)|Expression too|Exponent too" "$FILE" || true

echo
echo "=== Check for isinstance(n.value, (int, float)) specifically ==="
rg -n "isinstance\\(n\\.value, \\(int, float\\)\\)|isinstance\\(.*n\\.value.*int" "$FILE" || true

echo
echo "=== Repo-wide: see if this evaluator pattern exists elsewhere (optional but helps) ==="
rg -n "_ev\\(ast\\.parse\\(expression, mode=\"eval\"\\)\\)|Unsupported: \\{ast\\.dump\\(n\\)\\}" -S . || true

Repository: MervinPraison/PraisonAI

Length of output: 5643


Add complexity/length limits to the AST expression evaluator in calculate_with_tracking

RCE is mitigated, but the evaluator still has no bounds (no MAX_*/depth/size checks in examples/python/custom_tools/tools_with_latency.py), so attacker-controlled expression can trigger CPU/memory exhaustion via deeply nested expressions and unrestricted ast.Pow.

  • isinstance(n.value, (int, float)) also accepts bool (since bool is a subclass of int); restrict to exact numeric types.
Suggested guard rails
+        MAX_EXPRESSION_LENGTH = 200
+        MAX_DEPTH = 32
+        MAX_EXPONENT = 12
+
+        if len(expression) > MAX_EXPRESSION_LENGTH:
+            raise ValueError("Expression too long")
+
-        def _ev(n):
-            if isinstance(n, ast.Expression): return _ev(n.body)
+        def _ev(n, depth=0):
+            if depth > MAX_DEPTH:
+                raise ValueError("Expression too complex")
+            if isinstance(n, ast.Expression):
+                return _ev(n.body, depth + 1)
             ...
-            if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
+            if isinstance(n, ast.BinOp) and type(n.op) in _OPS:
+                left = _ev(n.left, depth + 1)
+                right = _ev(n.right, depth + 1)
+                if isinstance(n.op, ast.Pow) and abs(right) > MAX_EXPONENT:
+                    raise ValueError("Exponent too large")
+                return _OPS[type(n.op)](left, right)
🧰 Tools
🪛 Ruff (0.15.15)

[error] 58-58: Multiple statements on one line (colon)

(E701)


[error] 59-59: Multiple statements on one line (colon)

(E701)


[error] 60-60: Multiple statements on one line (colon)

(E701)


[error] 61-61: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/python/custom_tools/tools_with_latency.py` around lines 51 - 63, The
AST evaluator in calculate_with_tracking uses _ev and _OPS but lacks bounds and
treats bool as numeric; update _ev to enforce maximum recursion depth and
maximum AST node/operand count (e.g., MAX_DEPTH, MAX_NODES) and track/abort when
exceeded, restrict the numeric check to exact types (use type(n.value) is int or
type(n.value) is float) instead of isinstance, and limit or sanitize the Pow
operator (e.g., disallow ast.Pow or clamp exponents) to prevent runaway
CPU/memory usage.

return f"Result: {result}"
except Exception as e:
return f"Error: {str(e)}"
Expand Down
16 changes: 12 additions & 4 deletions examples/python/mcp/mcp-tools-server-sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,18 @@ def calculate(expression: str) -> dict:
expression: Math expression (e.g., "2 + 2 * 3")
"""
try:
allowed = set("0123456789+-*/.() ")
if not all(c in allowed for c in expression):
return {"error": "Invalid characters"}
return {"expression": expression, "result": eval(expression)}
import ast, operator
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
Comment on lines +40 to +43
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Allowing the exponentiation operator (ast.Pow / **) in the safe evaluator introduces a Denial of Service (DoS) vulnerability. An attacker can supply an expression with nested exponentiation (e.g., 9**9**9**9 or 10**1000000000) that consumes excessive CPU and memory, potentially hanging or crashing the server.

Since this is a simple calculator tool, it is safest to remove ast.Pow from the allowed operators.

Suggested change
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod,
ast.USub: operator.neg, ast.UAdd: operator.pos}

def _ev(n):
if isinstance(n, ast.Expression): return _ev(n.body)
if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
raise ValueError(f"Unsupported: {ast.dump(n)}")
Comment on lines +44 to +49
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Split the one-line evaluator branches.

Ruff is already flagging Lines 45-48 with E701, so this block will keep failing lint until each branch is expanded.

🧰 Tools
🪛 Ruff (0.15.15)

[error] 45-45: Multiple statements on one line (colon)

(E701)


[error] 46-46: Multiple statements on one line (colon)

(E701)


[error] 47-47: Multiple statements on one line (colon)

(E701)


[error] 48-48: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/python/mcp/mcp-tools-server-sse.py` around lines 44 - 49, The
one-line branches inside the _ev function violate the E701 rule; expand each
conditional into a multi-line if/elif/else block so each branch has its own
indented body (e.g., replace "if isinstance(n, ast.Expression): return
_ev(n.body)" with a multi-line if block), and do the same for the ast.Constant,
ast.UnaryOp, ast.BinOp checks and the final raise ValueError; keep the existing
logic and use the same symbols (_ev, ast.Expression, ast.Constant, ast.UnaryOp,
ast.BinOp, _OPS) so behavior remains unchanged.

return {"expression": expression, "result": _ev(ast.parse(expression, mode="eval"))}
Comment on lines +39 to +50
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject or cap exponentiation on the public SSE tool.

Because this server is exposed on 0.0.0.0, a client can send expressions such as 9**9999999 and tie up the process in operator.pow. If you need **, add strict literal/exponent/result limits first; otherwise the safer default is to drop ast.Pow from _OPS here.

🧰 Tools
🪛 Ruff (0.15.15)

[error] 45-45: Multiple statements on one line (colon)

(E701)


[error] 46-46: Multiple statements on one line (colon)

(E701)


[error] 47-47: Multiple statements on one line (colon)

(E701)


[error] 48-48: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/python/mcp/mcp-tools-server-sse.py` around lines 39 - 50, The SSE
tool's evaluator (_ev) currently allows ast.Pow via _OPS which enables expensive
exponentiation (e.g., 9**9999999); either remove ast.Pow from the _OPS dict or
add strict guards in _ev that validate exponentiation operands and result size
before calling operator.pow (e.g., require integer small non-negative exponents,
cap exponent value and/or result magnitude), and raise ValueError for disallowed
cases; update the _OPS/_ev logic accordingly so ast.Pow is either absent or only
executed after these checks.

except Exception as e:
return {"error": str(e)}

Expand Down
18 changes: 12 additions & 6 deletions examples/python/mcp/mcp-tools-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,18 @@ def calculate(expression: str) -> Dict[str, Any]:
The result of the calculation
"""
try:
# Safe evaluation of mathematical expressions
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
return {"error": "Invalid characters in expression"}

result = eval(expression) # Use safer alternative in production
import ast, operator
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
Comment on lines +51 to +54
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Allowing the exponentiation operator (ast.Pow / **) in the safe evaluator introduces a Denial of Service (DoS) vulnerability. An attacker can supply an expression with nested exponentiation (e.g., 9**9**9**9 or 10**1000000000) that consumes excessive CPU and memory, potentially hanging or crashing the server.

Since this is a simple calculator tool, it is safest to remove ast.Pow from the allowed operators.

Suggested change
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod,
ast.USub: operator.neg, ast.UAdd: operator.pos}

def _ev(n):
if isinstance(n, ast.Expression): return _ev(n.body)
if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
raise ValueError(f"Unsupported: {ast.dump(n)}")
Comment on lines +55 to +60
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Tighten numeric literal validation and address Ruff E701

  • Reject bool literals explicitly (isinstance(True, int) is True), so only type(n.value) in (int, float) is allowed.
  • Split the if ...: return ... one-liners to avoid Ruff E701.
Suggested fix
         def _ev(n):
-            if isinstance(n, ast.Expression): return _ev(n.body)
-            if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
-            if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
-            if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
+            if isinstance(n, ast.Expression):
+                return _ev(n.body)
+            if isinstance(n, ast.Constant):
+                if type(n.value) not in (int, float):
+                    raise ValueError(f"Unsupported: {ast.dump(n)}")
+                return n.value
+            if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS:
+                return _OPS[type(n.op)](_ev(n.operand))
+            if isinstance(n, ast.BinOp) and type(n.op) in _OPS:
+                return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
             raise ValueError(f"Unsupported: {ast.dump(n)}")
🧰 Tools
🪛 Ruff (0.15.15)

[error] 56-56: Multiple statements on one line (colon)

(E701)


[error] 57-57: Multiple statements on one line (colon)

(E701)


[error] 58-58: Multiple statements on one line (colon)

(E701)


[error] 59-59: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/python/mcp/mcp-tools-server.py` around lines 55 - 60, The helper _ev
currently accepts bools as numeric literals and uses single-line if-statements
that trigger Ruff E701; update _ev to explicitly check the concrete type of
numeric constants by replacing "isinstance(n, ast.Constant) and
isinstance(n.value, (int, float))" with "isinstance(n, ast.Constant) and
type(n.value) in (int, float)" to reject bools, and expand all single-line if
...: return ... statements into multi-line if blocks (e.g., "if ...:\n    return
...") for the branches handling ast.Expression, ast.Constant, ast.UnaryOp, and
ast.BinOp that reference _OPS and ast so the logic remains identical but
conforms to style rules.

result = _ev(ast.parse(expression, mode="eval"))
return {"expression": expression, "result": result}
except Exception as e:
return {"error": str(e)}
Expand Down
19 changes: 12 additions & 7 deletions examples/python/mcp_server_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,18 @@ def calculate(expression: str) -> Dict[str, Any]:
The result of the calculation
"""
try:
# Safe evaluation of mathematical expressions
# In production, use a proper math parser
allowed_chars = set("0123456789+-*/.() ")
if not all(c in allowed_chars for c in expression):
return {"error": "Invalid characters in expression"}

result = eval(expression) # Note: Use safer alternative in production
import ast, operator
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
Comment on lines +53 to +56
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Allowing the exponentiation operator (ast.Pow / **) in the safe evaluator introduces a Denial of Service (DoS) vulnerability. An attacker can supply an expression with nested exponentiation (e.g., 9**9**9**9 or 10**1000000000) that consumes excessive CPU and memory, potentially hanging or crashing the server.

Since this is a simple calculator tool, it is safest to remove ast.Pow from the allowed operators.

Suggested change
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod,
ast.USub: operator.neg, ast.UAdd: operator.pos}

def _ev(n):
if isinstance(n, ast.Expression): return _ev(n.body)
if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
raise ValueError(f"Unsupported: {ast.dump(n)}")
Comment on lines +57 to +62
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show the relevant portion of the file with line numbers
sed -n '1,120p' examples/python/mcp_server_example.py | nl -ba | sed -n '40,90p'

# Show if there's a Ruff config and whether E701 is enabled
ls -la | head
fd -a "ruff.*" . || true
fd -a "pyproject.toml" . || true
if [ -f pyproject.toml ]; then
  echo "---- pyproject.toml ----"
  rg -n "ruff|E701" pyproject.toml || true
fi

# Search for E701 mentions in any ruff config files
fd -a -g"*ruff*" . | while read -r f; do
  echo "---- $f ----"
  rg -n "E701" "$f" || true
done

Repository: MervinPraison/PraisonAI

Length of output: 110


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Show the relevant portion of the file with line numbers
nl -ba examples/python/mcp_server_example.py | sed -n '40,90p'

# Locate Ruff config files
echo "---- Ruff config candidates ----"
fd -a "ruff.*\\.toml|\\.ruff\\.toml|ruff\\.ini|\\.ruff\\.ini" . || true
fd -a "pyproject\\.toml" . || true
fd -a -g"*.toml" "ruff" . || true

# Search for Ruff settings in pyproject.toml and any .ruff*.toml/.ini
for f in $(fd -a "pyproject\\.toml|ruff\\.ini|\\.ruff\\.toml|ruff\\.toml|\\.ruff\\.ini" . || true); do
  echo "---- $f ----"
  rg -n "ruff|E701|ignore|select" "$f" || true
done

# Additionally, just search repository for the specific line pattern / function name
rg -n "def _ev\\(n\\)" -S . || true
rg -n "Unsupported: \\{ast\\.dump" -S examples/python/mcp_server_example.py || true

Repository: MervinPraison/PraisonAI

Length of output: 110


🏁 Script executed:

#!/bin/bash
set -euo pipefail

nl -ba examples/python/mcp_server_example.py | sed -n '45,90p'

# Try to find ruff config and E701 enablement
if [ -f pyproject.toml ]; then
  rg -n "ruff|E701" pyproject.toml || true
fi
# Search common files quickly
for f in $(fd -a "ruff\\.toml|ruff\\.ini|\\.ruff\\.toml|\\.ruff\\.ini|pyproject\\.toml" . || true); do
  echo "---- $f ----"
  rg -n "E701" "$f" || true
  rg -n "select\\s*=|ignore\\s*=|lint\\.select|lint\\.ignore" "$f" || true
done

# Confirm AST evaluation code exists as described
rg -n "ast\\.Constant" examples/python/mcp_server_example.py || true
rg -n "type\\(n\\.value\\)" examples/python/mcp_server_example.py || true

Repository: MervinPraison/PraisonAI

Length of output: 110


🏁 Script executed:

#!/bin/bash
set -euo pipefail

nl -ba examples/python/mcp_server_example.py | sed -n '45,90p'

echo "---- Ruff config checks ----"
for f in pyproject.toml ruff.toml .ruff.toml ruff.ini .ruff.ini; do
  if [ -f "$f" ]; then
    echo "---- $f ----"
    rg -n "E701" "$f" || true
    rg -n "select\\s*=|ignore\\s*=|lint\\.select|lint\\.ignore" "$f" || true
  fi
done

# Search within repository for ruff config
fd -a "ruff\\.toml|ruff\\.ini|\\.ruff\\.toml|\\.ruff\\.ini|pyproject\\.toml" . | while read -r f; do
  echo "---- $f ----"
  rg -n "E701" "$f" || true
done

# Verify boolean handling in _ev
sed -n '55,70p' examples/python/mcp_server_example.py

Repository: MervinPraison/PraisonAI

Length of output: 110


🏁 Script executed:

#!/bin/bash
set -euo pipefail

nl -ba examples/python/mcp_server_example.py | sed -n '50,75p'

echo "---- Ruff config (E701) ----"
if [ -f pyproject.toml ]; then
  rg -n "E701" pyproject.toml || true
  rg -n "lint\\.(select|ignore)" pyproject.toml || true
  rg -n "ruff" pyproject.toml || true
fi

# Search for any ruff config files in repo
fd -a "ruff\\.toml|ruff\\.ini|\\.ruff\\.toml|\\.ruff\\.ini|pyproject\\.toml" . | while read -r f; do
  echo "---- $f ----"
  rg -n "E701" "$f" || true
  rg -n "lint\\.(select|ignore)" "$f" || true
done

# Confirm current _ev constant guard
rg -n "if isinstance\\(n, ast\\.Constant\\)" -n examples/python/mcp_server_example.py || true

Repository: MervinPraison/PraisonAI

Length of output: 110


🏁 Script executed:

true

Repository: MervinPraison/PraisonAI

Length of output: 49


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "---- File snippet (lines 45-80) ----"
sed -n '45,80p' examples/python/mcp_server_example.py | cat -n

echo "---- Search for _ev function ----"
rg -n "def _ev\\(n\\)" -n examples/python/mcp_server_example.py || true
rg -n "ast\\.Constant" examples/python/mcp_server_example.py || true

echo "---- Locate Ruff config ----"
# list likely config files (avoid huge walks)
for f in pyproject.toml ruff.toml .ruff.toml ruff.ini .ruff.ini setup.cfg tox.ini .flake8 .flake8.ini; do
  [ -f "$f" ] && echo "FOUND: $f"
done

# search common config names anywhere in repo (small output)
fd -a "pyproject\\.toml|ruff\\.toml|\\.ruff\\.toml|ruff\\.ini|\\.ruff\\.ini" . 2>/dev/null || true

echo "---- Ruff config: check E701 / lint.select / lint.ignore ----"
# check top-level and any found config files
configs=(pyproject.toml ruff.toml .ruff.toml ruff.ini .ruff.ini)
for cfg in "${configs[@]}"; do
  if [ -f "$cfg" ]; then
    echo "---- $cfg ----"
    rg -n "E701" "$cfg" || true
    rg -n "lint\\.(select|ignore)" "$cfg" || true
    rg -n "select\\s*=|ignore\\s*=" "$cfg" || true
    rg -n "\\[tool\\.ruff\\]" "$cfg" || true
  fi
done

# also search any .ruff* and pyproject.toml across repo for E701
fd -a "pyproject\\.toml|ruff\\.toml|\\.ruff\\.toml|ruff\\.ini|\\.ruff\\.ini" . 2>/dev/null | while read -r f; do
  [ -f "$f" ] || continue
  rg -n "E701" "$f" && echo "---- $f (E701 match) ----" || true
done

Repository: MervinPraison/PraisonAI

Length of output: 2617


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "---- Show ruff.toml select block (first 120 lines) ----"
sed -n '1,120p' ruff.toml

echo "---- Search ruff.toml for E701/E70/E (select block context) ----"
rg -n "E701|E70\\b|\\bE\\b|select\\s*=\\s*\\[" ruff.toml || true

echo "---- Search all repo ruff/pyproject configs for E701/E70 ----"
for f in \
  ruff.toml \
  src/praisonai/pyproject.toml \
  src/praisonai-agents/pyproject.toml \
  src/praisonai-platform/pyproject.toml \
  examples/python/plugin_template/pyproject.toml \
  pyproject.toml; do
  if [ -f "$f" ]; then
    echo "==== $f ===="
    rg -n "E701|E70\\b" "$f" || true
    rg -n "\\[tool\\.ruff\\]|select\\s*=|ignore\\s*=" "$f" || true
  fi
done

echo "---- Print tool.ruff select from any pyproject.toml found (only relevant lines) ----"
for f in $(fd -a "pyproject\\.toml" . 2>/dev/null); do
  rg -n "\\[tool\\.ruff\\]|^select\\s*=|^ignore\\s*=" "$f" >/dev/null 2>&1 && echo "MATCH: $f"
done

Repository: MervinPraison/PraisonAI

Length of output: 1899


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Try to run ruff on the file (if ruff is available in the sandbox)
ruff --version || true
ruff check examples/python/mcp_server_example.py || true

Repository: MervinPraison/PraisonAI

Length of output: 5343


Exclude booleans from numeric evaluation and fix E701 single-line if branches

ast.Constant currently accepts True/False because bool is a subclass of int, contradicting the “numeric-only” behavior. Ruff also flags the single-line if ...: return ... branches as E701.

Suggested fix
         def _ev(n):
-            if isinstance(n, ast.Expression): return _ev(n.body)
-            if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
-            if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
-            if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
+            if isinstance(n, ast.Expression):
+                return _ev(n.body)
+            if isinstance(n, ast.Constant):
+                if type(n.value) not in (int, float):
+                    raise ValueError(f"Unsupported: {ast.dump(n)}")
+                return n.value
+            if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS:
+                return _OPS[type(n.op)](_ev(n.operand))
+            if isinstance(n, ast.BinOp) and type(n.op) in _OPS:
+                return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
             raise ValueError(f"Unsupported: {ast.dump(n)}")
🧰 Tools
🪛 Ruff (0.15.15)

[error] 58-58: Multiple statements on one line (colon)

(E701)


[error] 59-59: Multiple statements on one line (colon)

(E701)


[error] 60-60: Multiple statements on one line (colon)

(E701)


[error] 61-61: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/python/mcp_server_example.py` around lines 57 - 62, The helper _ev
currently treats ast.Constant values using isinstance(n.value, (int, float))
which wrongly accepts booleans (since bool is a subclass of int) and uses
single-line if statements flagged by E701; update _ev to explicitly exclude bool
(e.g., check that value is not isinstance(..., bool) or use type(...) in (int,
float)) and rewrite all single-line if statements into multi-line blocks for
readability and lint compliance, keeping the existing handling for
ast.Expression, ast.UnaryOp and ast.BinOp (using _OPS) and preserving the final
raise ValueError for unsupported nodes.

result = _ev(ast.parse(expression, mode="eval"))
Comment on lines 51 to +63
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate and print the relevant section around the evaluator
FILE="examples/python/mcp_server_example.py"
if [ ! -f "$FILE" ]; then
  echo "Missing $FILE"; exit 1
fi

echo "== Snippet context =="
nl -ba "$FILE" | sed -n '1,120p' | sed -n '40,95p'

echo "== Search for calculate / evaluator / expression limits =="
rg -n "def calculate|calculate\\(|_ev\\(|expression\\b|ast\\.Pow|MAX_|depth|depth\\b|SSE|ServerSentEvents|sse|ToolsMCPServer" "$FILE"

echo "== Search for server tool registration =="
rg -n "ToolsMCPServer|register|tool|calculate" "$FILE"

# Check for other similar evaluator implementations in nearby example files for consistency
echo "== Related mcp-tools-server evaluator =="
TARGETS=(examples/python/mcp/mcp-tools-server.py examples/python/mcp_server_example.py)
for t in "${TARGETS[@]}"; do
  if [ -f "$t" ]; then
    echo "--- $t ---"
    rg -n "def calculate|_ev\\(|ast\\.Pow|MAX_|depth|expression\\b" "$t" || true
  fi
done

Repository: MervinPraison/PraisonAI

Length of output: 151


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="examples/python/mcp_server_example.py"
if [ ! -f "$FILE" ]; then
  echo "Missing $FILE"; exit 1
fi

echo "== Snippet context (lines 40-95) =="
# Use sed to print with line numbers via awk (nl may not exist)
awk 'NR>=40 && NR<=95 {printf "%4d:%s\n", NR, $0}' "$FILE"

echo "== Search for calculate / evaluator / expression limits =="
rg -n "def calculate|calculate\\(|_ev\\(|expression\\b|ast\\.Pow|MAX_|depth\\b|SSE|ServerSentEvents|sse|ToolsMCPServer" "$FILE" || true

echo "== Search for server tool registration =="
rg -n "ToolsMCPServer|register|tool|calculate" "$FILE" || true

echo "== Related mcp-tools-server evaluator =="
for t in examples/python/mcp/mcp-tools-server.py examples/python/mcp_server_example.py; do
  if [ -f "$t" ]; then
    echo "--- $t ---"
    rg -n "def calculate|_ev\\(|ast\\.Pow|MAX_|depth\\b|expression\\b" "$t" || true
  fi
done

Repository: MervinPraison/PraisonAI

Length of output: 5841


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Print relevant context for SSE/tool registration
FILE="examples/python/mcp_server_example.py"
echo "== Registration/SSE wiring (lines 100-170) =="
awk 'NR>=95 && NR<=170 {printf "%4d:%s\n", NR, $0}' "$FILE"

echo "== Check for any existing MAX_/limit/depth logic =="
rg -n "MAX_|limit|depth|exponent|expression.*len|ast\\.walk" "$FILE" || true

# Inspect related file for same evaluator
FILE2="examples/python/mcp/mcp-tools-server.py"
echo "== Related evaluator wiring (mcp-tools-server.py lines 1-120) =="
awk 'NR>=1 && NR<=120 {printf "%4d:%s\n", NR, $0}' "$FILE2"

echo "== Related file any existing limits =="
rg -n "MAX_|limit|depth|exponent|expression.*len" "$FILE2" || true

# Confirm whether boolean constants are allowed by the predicate
echo "== Sanity check: bool is instance of int? (Python runtime) =="
python3 - <<'PY'
print(isinstance(True,(int,float)))
PY

# Confirm ruff E701 possibility by checking for single-line if statements
echo "== Lines with 'if ...: return' patterns (potential E701) =="
rg -n "if .+:\s*return" "$FILE" "$FILE2" || true

Repository: MervinPraison/PraisonAI

Length of output: 7985


Put resource limits around this evaluator before serving it.

  • calculate() is exposed via ToolsMCPServer over SSE, and although it blocks code execution, it still evaluates attacker-controlled arithmetic with no caps on expression length, nesting depth, or exponent size—enabling resource-exhaustion DoS.
  • Boolean literals are accepted because ast.Constant allows isinstance(n.value, (int, float)), and bool is a subclass of int.
  • The single-line if ...: return ... branches in _ev match Ruff’s E701 pattern.
Suggested guard rails
+        MAX_EXPRESSION_LENGTH = 200
+        MAX_DEPTH = 32
+        MAX_EXPONENT = 12
+
+        if len(expression) > MAX_EXPRESSION_LENGTH:
+            raise ValueError("Expression too long")
+
-        def _ev(n):
-            if isinstance(n, ast.Expression): return _ev(n.body)
+        def _ev(n, depth=0):
+            if depth > MAX_DEPTH:
+                raise ValueError("Expression too complex")
+            if isinstance(n, ast.Expression):
+                return _ev(n.body, depth + 1)
             ...
-            if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
+            if isinstance(n, ast.BinOp) and type(n.op) in _OPS:
+                left = _ev(n.left, depth + 1)
+                right = _ev(n.right, depth + 1)
+                if isinstance(n.op, ast.Pow) and abs(right) > MAX_EXPONENT:
+                    raise ValueError("Exponent too large")
+                return _OPS[type(n.op)](left, right)
🧰 Tools
🪛 Ruff (0.15.15)

[error] 58-58: Multiple statements on one line (colon)

(E701)


[error] 59-59: Multiple statements on one line (colon)

(E701)


[error] 60-60: Multiple statements on one line (colon)

(E701)


[error] 61-61: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@examples/python/mcp_server_example.py` around lines 51 - 63, The evaluator
exposed by ToolsMCPServer.calculate() is vulnerable to DoS and accepts bools;
update the _ev evaluator to (1) enforce explicit caps before parsing/evaluating:
limit input string length, maximum AST node count/depth, and restrict numeric
magnitude (e.g., max exponent/absolute value); (2) reject booleans by checking
type(n.value) is int/float but not bool or explicitly isinstance(n.value, bool)
and raising; (3) whitelist allowed AST node types (Expression, Constant,
UnaryOp, BinOp, specific operator classes) and raise on anything else; and (4)
fix single-line if returns in _ev to use multi-line blocks to satisfy E701.
Reference symbols: calculate(), ToolsMCPServer, _ev, ast.parse, and the _OPS
mapping when applying these guards.

return {"expression": expression, "result": result}
except Exception as e:
return {"error": str(e)}
Expand Down
16 changes: 13 additions & 3 deletions src/praisonai-agents/benchmarks/real_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,20 @@ def get_time(timezone: Literal["est", "pst"]) -> str:


def calculate(expression: str) -> str:
"""Calculate a math expression."""
"""Calculate a math expression safely."""
try:
result = eval(expression)
return f"Result: {result}"
import ast, operator
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
Comment on lines +85 to +88
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Allowing the exponentiation operator (ast.Pow / **) in the safe evaluator introduces a Denial of Service (DoS) vulnerability. An attacker can supply an expression with nested exponentiation (e.g., 9**9**9**9 or 10**1000000000) that consumes excessive CPU and memory, potentially hanging or crashing the server.

Since this is a simple calculator tool, it is safest to remove ast.Pow from the allowed operators.

Suggested change
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod, ast.Pow: operator.pow,
ast.USub: operator.neg, ast.UAdd: operator.pos}
_OPS = {ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul,
ast.Div: operator.truediv, ast.FloorDiv: operator.floordiv,
ast.Mod: operator.mod,
ast.USub: operator.neg, ast.UAdd: operator.pos}

def _ev(n):
if isinstance(n, ast.Expression): return _ev(n.body)
if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
raise ValueError(f"Unsupported: {ast.dump(n)}")
Comment on lines +89 to +94
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="src/praisonai-agents/benchmarks/real_benchmark.py"

echo "== File stats =="
wc -l "$FILE" | cat

echo "== Snippet around lines 70-110 =="
nl -ba "$FILE" | sed -n '70,110p'

Repository: MervinPraison/PraisonAI

Length of output: 234


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Try to locate any other _ev occurrences and any Ruff/flake8 configs mentioning E701
FILE="src/praisonai-agents/benchmarks/real_benchmark.py"
rg -n "def _ev\\b" -S "$FILE" || true
rg -n "E701" -S . || true
rg -n "ruff" -S pyproject.toml setup.cfg .ruff.toml . || true

Repository: MervinPraison/PraisonAI

Length of output: 10209


🏁 Script executed:

#!/bin/bash
set -euo pipefail

FILE="src/praisonai-agents/benchmarks/real_benchmark.py"
echo "== _ev() implementation (around line 80-105) =="
awk 'NR>=75 && NR<=105 {printf "%d:%s\n", NR, $0}' "$FILE"

echo
echo "== ruff.toml (top) =="
if [ -f "ruff.toml" ]; then
  awk 'NR<=120 {print NR ":" $0}' ruff.toml
else
  echo "ruff.toml not found"
fi

Repository: MervinPraison/PraisonAI

Length of output: 2901


Keep _ev’s constant allowlist numeric-only and split single-line branches to satisfy Ruff E701.
src/praisonai-agents/benchmarks/real_benchmark.py:90-93 still accepts True/False (since bool subclasses int) and the one-line if ...: return ... statements trigger Ruff E701.

Suggested fix
         def _ev(n):
-            if isinstance(n, ast.Expression): return _ev(n.body)
-            if isinstance(n, ast.Constant) and isinstance(n.value, (int, float)): return n.value
-            if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.operand))
-            if isinstance(n, ast.BinOp) and type(n.op) in _OPS: return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
+            if isinstance(n, ast.Expression):
+                return _ev(n.body)
+            if isinstance(n, ast.Constant):
+                if type(n.value) not in (int, float):
+                    raise ValueError(f"Unsupported: {ast.dump(n)}")
+                return n.value
+            if isinstance(n, ast.UnaryOp) and type(n.op) in _OPS:
+                return _OPS[type(n.op)](_ev(n.operand))
+            if isinstance(n, ast.BinOp) and type(n.op) in _OPS:
+                return _OPS[type(n.op)](_ev(n.left), _ev(n.right))
             raise ValueError(f"Unsupported: {ast.dump(n)}")
🧰 Tools
🪛 Ruff (0.15.15)

[error] 90-90: Multiple statements on one line (colon)

(E701)


[error] 91-91: Multiple statements on one line (colon)

(E701)


[error] 92-92: Multiple statements on one line (colon)

(E701)


[error] 93-93: Multiple statements on one line (colon)

(E701)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/praisonai-agents/benchmarks/real_benchmark.py` around lines 89 - 94, The
helper _ev should only accept real numeric constants (not booleans) and must not
use single-line if/return branches; update the isinstance checks in _ev to
verify numeric types while excluding bool (e.g., change "isinstance(n.value,
(int, float))" to "isinstance(n.value, (int, float)) and not isinstance(n.value,
bool)"), and expand each one-line conditional like "if ...: return ..." into
multi-line form with explicit return statements; keep references to _ev and the
operator map _OPS when modifying the UnaryOp and BinOp branches.

return f"Result: {_ev(ast.parse(expression, mode='eval'))}"
except Exception:
return "Error: Invalid expression"

Expand Down
38 changes: 28 additions & 10 deletions src/praisonai-platform/praisonai_platform/services/auth_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,37 @@
_pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

_DEFAULT_SECRET = "dev-secret-change-me"
JWT_SECRET = os.environ.get("PLATFORM_JWT_SECRET", _DEFAULT_SECRET)
JWT_SECRET = os.environ.get("PLATFORM_JWT_SECRET")
JWT_ALGORITHM = "HS256"
JWT_TTL_SECONDS = int(os.environ.get("PLATFORM_JWT_TTL", str(30 * 24 * 3600)))

if JWT_SECRET == _DEFAULT_SECRET and os.environ.get("PLATFORM_ENV", "dev") != "dev":
raise RuntimeError(
"PLATFORM_JWT_SECRET must be set to a strong random value in production. "
"Set PLATFORM_ENV=dev to suppress this check during development."
)
if JWT_SECRET is None or JWT_SECRET == _DEFAULT_SECRET:
Comment on lines +26 to +30
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 security Empty-string secret bypasses all guards

os.environ.get("PLATFORM_JWT_SECRET") returns "" when the variable is set to an empty string. Because "" is neither None nor equal to _DEFAULT_SECRET, the condition on line 30 is false, and the empty string silently becomes the JWT signing key. PyJWT accepts an empty HMAC key without error, so any production deployment that accidentally sets PLATFORM_JWT_SECRET= (empty) will sign tokens with no meaningful secret — effectively the same exposure this PR is hardening against.

# Determine if we are in an explicit dev environment.
# Only trust PLATFORM_ENV when it is explicitly set — an unset value
# must NOT silently fall back to "dev" because production deployments
# that forget the variable would run with a well-known signing key.
_explicit_env = os.environ.get("PLATFORM_ENV")
if _explicit_env == "dev":
# Developer explicitly opted in to insecure defaults — allow it.
JWT_SECRET = _DEFAULT_SECRET
else:
# Either production or environment not specified.
# Auto-generate an ephemeral secret so the service can start, but
# warn loudly because tokens will not survive a restart.
import secrets as _secrets
import warnings as _warnings

JWT_SECRET = _secrets.token_urlsafe(32)
_warnings.warn(
"PLATFORM_JWT_SECRET is not set. Auto-generated an ephemeral "
"signing key — JWT tokens will be invalidated on restart. "
"Set PLATFORM_JWT_SECRET to a stable random value for production "
"use, or set PLATFORM_ENV=dev to use the development default.",
stacklevel=2,
)
Comment on lines +39 to +53
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Auto-generating an ephemeral random secret in production when PLATFORM_JWT_SECRET is missing can lead to severe reliability issues. In multi-instance deployments (e.g., behind a load balancer or in Kubernetes), each instance will generate a different key, causing random authentication failures (HTTP 401) for users. Additionally, any container restart or redeployment will invalidate all active sessions.

It is highly recommended to fail fast by raising a RuntimeError when running in production without a configured secret. This forces administrators to configure the secret correctly and prevents silent, hard-to-debug session issues.

Suggested change
else:
# Either production or environment not specified.
# Auto-generate an ephemeral secret so the service can start, but
# warn loudly because tokens will not survive a restart.
import secrets as _secrets
import warnings as _warnings
JWT_SECRET = _secrets.token_urlsafe(32)
_warnings.warn(
"PLATFORM_JWT_SECRET is not set. Auto-generated an ephemeral "
"signing key — JWT tokens will be invalidated on restart. "
"Set PLATFORM_JWT_SECRET to a stable random value for production "
"use, or set PLATFORM_ENV=dev to use the development default.",
stacklevel=2,
)
else:
# Either production or environment not specified.
# Fail fast to prevent silent multi-instance auth failures and session invalidation on restart.
raise RuntimeError(
"PLATFORM_JWT_SECRET must be set to a strong random value in production. "
"Set PLATFORM_ENV=dev to suppress this check during development."
)

Comment on lines +46 to +53
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The warning text "PLATFORM_JWT_SECRET is not set" is factually incorrect when the secret is explicitly set to the well-known default string. An operator who purposely wrote PLATFORM_JWT_SECRET=dev-secret-change-me will see a confusing message claiming the variable is missing, potentially masking what the code actually rejected.

Suggested change
JWT_SECRET = _secrets.token_urlsafe(32)
_warnings.warn(
"PLATFORM_JWT_SECRET is not set. Auto-generated an ephemeral "
"signing key — JWT tokens will be invalidated on restart. "
"Set PLATFORM_JWT_SECRET to a stable random value for production "
"use, or set PLATFORM_ENV=dev to use the development default.",
stacklevel=2,
)
JWT_SECRET = _secrets.token_urlsafe(32)
_warnings.warn(
"PLATFORM_JWT_SECRET is not set or is set to the insecure default. "
"Auto-generated an ephemeral signing key — JWT tokens will be "
"invalidated on restart. Set PLATFORM_JWT_SECRET to a stable random "
"value for production use, or set PLATFORM_ENV=dev to use the "
"development default.",
stacklevel=1,
)

Comment on lines +30 to +53
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Restore fail-fast behavior for explicit non-dev environments.

This else branch currently handles both PLATFORM_ENV unset and explicitly non-dev values, so PLATFORM_ENV=prod without PLATFORM_JWT_SECRET now boots with an ephemeral signing key instead of failing. That misses the PR’s hardening goal and leaves non-dev deployments in a broken/token-unstable state.

Suggested fix
 if JWT_SECRET is None or JWT_SECRET == _DEFAULT_SECRET:
     _explicit_env = os.environ.get("PLATFORM_ENV")
     if _explicit_env == "dev":
         JWT_SECRET = _DEFAULT_SECRET
+    elif _explicit_env is not None:
+        raise RuntimeError(
+            "PLATFORM_JWT_SECRET must be set when PLATFORM_ENV is not 'dev'."
+        )
     else:
         import secrets as _secrets
         import warnings as _warnings

         JWT_SECRET = _secrets.token_urlsafe(32)
         _warnings.warn(
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/praisonai-platform/praisonai_platform/services/auth_service.py` around
lines 30 - 53, The current branch treats PLATFORM_ENV unset and explicitly
non-"dev" the same and auto-generates an ephemeral JWT_SECRET; instead, change
the logic in the JWT_SECRET initialization (the block guarded by JWT_SECRET is
None or JWT_SECRET == _DEFAULT_SECRET and the check of
os.environ.get("PLATFORM_ENV")) so that: if _explicit_env is None (unset) you
keep the existing behavior of auto-generating and warning, but if _explicit_env
is explicitly set and not "dev" (e.g., "prod") you fail-fast — raise an
exception (or exit) with a clear message telling the operator to set
PLATFORM_JWT_SECRET to a stable value or set PLATFORM_ENV=dev; reference
JWT_SECRET, _DEFAULT_SECRET and PLATFORM_ENV in the error handling so the code
refuses to start in explicit non-dev environments without a provided secret.


# Type-narrow: at this point JWT_SECRET is always a non-None str.
assert JWT_SECRET is not None # noqa: S105


class AuthService:
Expand Down Expand Up @@ -113,10 +135,6 @@ async def login(self, email: str, password: str) -> Optional[tuple[User, str]]:

def _issue_token(self, user: User) -> str:
"""Issue a JWT for a user."""
if JWT_SECRET == _DEFAULT_SECRET and os.environ.get("PLATFORM_ENV", "dev") != "dev":
raise RuntimeError(
"Refusing to issue JWT with default PLATFORM_JWT_SECRET outside dev"
)
now = datetime.now(timezone.utc)
payload = {
"sub": user.id,
Expand Down