Python a2ui_core: Add basic catalog function implementations#1576
Python a2ui_core: Add basic catalog function implementations#1576nan-yu wants to merge 3 commits into
Conversation
…catalog components
- Implement ExpressionParser mirroring the TypeScript version to support dynamic formatString placeholders. - Add basic catalog function implementations (arithmetic, comparison, logical, string, formatting, and validation).
|
Warning Gemini encountered an error creating the review. You can try again by commenting |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces the a2ui_core Python library, which provides core data models, reactive state management, and JSON schema validation logic for the A2UI protocol (v0.9+). It includes auto-generated component and function schemas, an expression parser, basic catalog function implementations, and comprehensive unit tests. The review feedback highlights several critical issues: negative numbers are not correctly parsed as number literals in the expression parser, a recursion depth check is missing in parse_expression, the date formatter lacks support for several documented TR35 tokens, division by zero is not handled correctly for negative or zero numerators, and redundant float conversions exist in the arithmetic implementations.
| if self.is_digit(scanner.peek()): | ||
| return self.parse_number_literal(scanner) |
There was a problem hiding this comment.
Negative numbers (e.g., -10) are currently not recognized as number literals because the check only looks for digits. As a result, they fall through to scan_path_or_identifier and are parsed as data paths (e.g., {"path": "-10"}). Update the condition to also match a leading minus sign followed by a digit.
| if self.is_digit(scanner.peek()): | |
| return self.parse_number_literal(scanner) | |
| if self.is_digit(scanner.peek()) or (scanner.peek() == "-" and self.is_digit(scanner.peek(1))): | |
| return self.parse_number_literal(scanner) |
| def parse_number_literal(self, scanner: Scanner) -> Union[int, float]: | ||
| start = scanner.pos | ||
| while not scanner.is_at_end() and ( | ||
| self.is_digit(scanner.peek()) or scanner.peek() == "." | ||
| ): | ||
| scanner.advance() | ||
| num_str = scanner.input[start : scanner.pos] | ||
| return float(num_str) if "." in num_str else int(num_str) |
There was a problem hiding this comment.
To correctly parse negative number literals, update parse_number_literal to consume the leading minus sign if present.
| def parse_number_literal(self, scanner: Scanner) -> Union[int, float]: | |
| start = scanner.pos | |
| while not scanner.is_at_end() and ( | |
| self.is_digit(scanner.peek()) or scanner.peek() == "." | |
| ): | |
| scanner.advance() | |
| num_str = scanner.input[start : scanner.pos] | |
| return float(num_str) if "." in num_str else int(num_str) | |
| def parse_number_literal(self, scanner: Scanner) -> Union[int, float]: | |
| start = scanner.pos | |
| if scanner.peek() == "-": | |
| scanner.advance() | |
| while not scanner.is_at_end() and ( | |
| self.is_digit(scanner.peek()) or scanner.peek() == "." | |
| ): | |
| scanner.advance() | |
| num_str = scanner.input[start : scanner.pos] | |
| return float(num_str) if "." in num_str else int(num_str) |
| def parse_expression(self, expr: str, depth: int = 0) -> Any: | ||
| expr = expr.strip() | ||
| if not expr: | ||
| return "" |
There was a problem hiding this comment.
The recursion depth limit check is missing in parse_expression. Because parse_expression is called recursively for nested interpolations (e.g., ${${foo}}), an excessively nested expression can cause a RecursionError or stack overflow. Add the depth check at the entry of parse_expression.
| def parse_expression(self, expr: str, depth: int = 0) -> Any: | |
| expr = expr.strip() | |
| if not expr: | |
| return "" | |
| def parse_expression(self, expr: str, depth: int = 0) -> Any: | |
| if depth > self.MAX_DEPTH: | |
| raise ValueError("Max recursion depth reached in parse") | |
| expr = expr.strip() | |
| if not expr: | |
| return "" |
| dt = datetime.datetime.fromisoformat(str(val).replace("Z", "+00:00")) | ||
| if fmt == "ISO": | ||
| return dt.isoformat().replace("+00:00", ".000Z") | ||
| py_fmt = ( | ||
| str(fmt) | ||
| .replace("yyyy", "%Y") | ||
| .replace("MM", "%m") | ||
| .replace("dd", "%d") | ||
| .replace("HH", "%H") | ||
| .replace("mm", "%M") | ||
| .replace("ss", "%S") | ||
| ) | ||
| return dt.strftime(py_fmt) |
There was a problem hiding this comment.
The current _format_date implementation only replaces a small subset of the documented TR35 tokens (yyyy, MM, dd, HH, mm, ss). Standard tokens like yy, MMM, MMMM, EEEE, E, d, H, hh, h, and a are ignored, leading to broken formatting. Additionally, literal % characters in the format string can cause strftime errors. Update the replacement mapping to support all documented TR35 tokens and escape % characters.
dt = datetime.datetime.fromisoformat(str(val).replace("Z", "+00:00"))
if fmt == "ISO":
return dt.isoformat().replace("+00:00", ".000Z")
py_fmt = (
str(fmt)
.replace("%", "%%")
.replace("yyyy", "%Y")
.replace("yy", "%y")
.replace("MMMM", "%B")
.replace("MMM", "%b")
.replace("MM", "%m")
.replace("M", "%m")
.replace("EEEE", "%A")
.replace("E", "%a")
.replace("dd", "%d")
.replace("d", "%d")
.replace("HH", "%H")
.replace("H", "%H")
.replace("hh", "%I")
.replace("h", "%I")
.replace("mm", "%M")
.replace("ss", "%S")
.replace("a", "%p")
)
return dt.strftime(py_fmt)| DivideImplementation = create_function_implementation( | ||
| DivideApi, | ||
| lambda args, context=None, abort_signal=None: ( | ||
| math.inf | ||
| if _to_float(args["b"]) == 0 | ||
| else ( | ||
| int(_to_float(args["a"]) / _to_float(args["b"])) | ||
| if (_to_float(args["a"]) / _to_float(args["b"])).is_integer() | ||
| else (_to_float(args["a"]) / _to_float(args["b"])) | ||
| ) | ||
| ), | ||
| ) |
There was a problem hiding this comment.
The division implementation always returns math.inf when dividing by zero, regardless of the sign of the numerator a. If a is negative, it should return -math.inf, and if a is zero, it should return math.nan. Refactor the implementation to handle these cases correctly and avoid redundant conversions.
def _divide(args, context=None, abort_signal=None):
a = _to_float(args["a"])
b = _to_float(args["b"])
if b == 0:
if a > 0:
return math.inf
elif a < 0:
return -math.inf
else:
return math.nan
res = a / b
return int(res) if res.is_integer() else res
DivideImplementation = create_function_implementation(DivideApi, _divide)| AddImplementation = create_function_implementation( | ||
| AddApi, | ||
| lambda args, context=None, abort_signal=None: ( | ||
| int(_to_float(args["a"]) + _to_float(args["b"])) | ||
| if (_to_float(args["a"]) + _to_float(args["b"])).is_integer() | ||
| else (_to_float(args["a"]) + _to_float(args["b"])) | ||
| ), | ||
| ) | ||
|
|
||
| SubtractImplementation = create_function_implementation( | ||
| SubtractApi, | ||
| lambda args, context=None, abort_signal=None: ( | ||
| int(_to_float(args["a"]) - _to_float(args["b"])) | ||
| if (_to_float(args["a"]) - _to_float(args["b"])).is_integer() | ||
| else (_to_float(args["a"]) - _to_float(args["b"])) | ||
| ), | ||
| ) | ||
|
|
||
| MultiplyImplementation = create_function_implementation( | ||
| MultiplyApi, | ||
| lambda args, context=None, abort_signal=None: ( | ||
| int(_to_float(args["a"]) * _to_float(args["b"])) | ||
| if (_to_float(args["a"]) * _to_float(args["b"])).is_integer() | ||
| else (_to_float(args["a"]) * _to_float(args["b"])) | ||
| ), | ||
| ) |
There was a problem hiding this comment.
The Add, Subtract, and Multiply implementations perform redundant float conversions and arithmetic operations (evaluating the same expression up to three times). Refactor them to use helper functions that perform the calculation once.
| AddImplementation = create_function_implementation( | |
| AddApi, | |
| lambda args, context=None, abort_signal=None: ( | |
| int(_to_float(args["a"]) + _to_float(args["b"])) | |
| if (_to_float(args["a"]) + _to_float(args["b"])).is_integer() | |
| else (_to_float(args["a"]) + _to_float(args["b"])) | |
| ), | |
| ) | |
| SubtractImplementation = create_function_implementation( | |
| SubtractApi, | |
| lambda args, context=None, abort_signal=None: ( | |
| int(_to_float(args["a"]) - _to_float(args["b"])) | |
| if (_to_float(args["a"]) - _to_float(args["b"])).is_integer() | |
| else (_to_float(args["a"]) - _to_float(args["b"])) | |
| ), | |
| ) | |
| MultiplyImplementation = create_function_implementation( | |
| MultiplyApi, | |
| lambda args, context=None, abort_signal=None: ( | |
| int(_to_float(args["a"]) * _to_float(args["b"])) | |
| if (_to_float(args["a"]) * _to_float(args["b"])).is_integer() | |
| else (_to_float(args["a"]) * _to_float(args["b"])) | |
| ), | |
| ) | |
| def _add(args, context=None, abort_signal=None): | |
| res = _to_float(args["a"]) + _to_float(args["b"]) | |
| return int(res) if res.is_integer() else res | |
| AddImplementation = create_function_implementation(AddApi, _add) | |
| def _subtract(args, context=None, abort_signal=None): | |
| res = _to_float(args["a"]) - _to_float(args["b"]) | |
| return int(res) if res.is_integer() else res | |
| SubtractImplementation = create_function_implementation(SubtractApi, _subtract) | |
| def _multiply(args, context=None, abort_signal=None): | |
| res = _to_float(args["a"]) * _to_float(args["b"]) | |
| return int(res) if res.is_integer() else res | |
| MultiplyImplementation = create_function_implementation(MultiplyApi, _multiply) |
Description
Replace this paragraph with a description of what this PR is changing or adding, and why. Consider including before/after screenshots.
List which issues are fixed by this PR. For larger changes, raising an issue first helps reduce redundant work.
Pre-launch Checklist
If you need help, consider asking for advice on the discussion board.