Python a2ui_agent: Refactor a2ui_agent to depend on a2ui_core#1582
Python a2ui_agent: Refactor a2ui_agent to depend on a2ui_core#1582nan-yu wants to merge 15 commits into
Conversation
…catalog components
- Implement ExpressionParser mirroring the TypeScript version to support dynamic formatString placeholders. - Add basic catalog function implementations (arithmetic, comparison, logical, string, formatting, and validation).
…alysis, and schema recursion constraints
… dynamic evaluation
…management via protocol messages
…a text-based layout renderer for testing
…iCatalog and streaming parser
…ingParser to use dynamic values
…resh dependencies across samples and tools
There was a problem hiding this comment.
Code Review
This pull request introduces the a2ui_core Python library, which implements core state management, rendering, message processing, and schema validation for the A2UI v0.9+ specification. Key feedback highlights several critical issues and improvement opportunities: a NameError in model_catalog.py due to an undefined variable, incomplete TR35 date token formatting in function_impls.py, redundant subscriptions for TemplateChildList in generic_binder.py, an inefficient list expansion loop in data_model.py that could trigger OOM errors, silent acceptance of unclosed string literals in expression_parser.py, and a potential AttributeError in data_context.py when handling malformed action events.
| target_val = ( | ||
| payload | ||
| if ( | ||
| hasattr(fn.schema, "model_fields") | ||
| and "call" in fn.schema.model_fields | ||
| ) | ||
| else args | ||
| ) |
There was a problem hiding this comment.
A NameError will occur here because the variable payload is referenced but has not been defined in this scope. It should be replaced with the dictionary representing the full function call structure.
| target_val = ( | |
| payload | |
| if ( | |
| hasattr(fn.schema, "model_fields") | |
| and "call" in fn.schema.model_fields | |
| ) | |
| else args | |
| ) | |
| target_val = ( | |
| {"call": name, "args": args} | |
| if ( | |
| hasattr(fn.schema, "model_fields") | |
| and "call" in fn.schema.model_fields | |
| ) | |
| else args | |
| ) |
| py_fmt = ( | ||
| str(fmt) | ||
| .replace("yyyy", "%Y") | ||
| .replace("MM", "%m") | ||
| .replace("dd", "%d") | ||
| .replace("HH", "%H") | ||
| .replace("mm", "%M") | ||
| .replace("ss", "%S") | ||
| ) |
There was a problem hiding this comment.
The current _format_date implementation only replaces a small subset of the documented TR35 date pattern tokens. Common tokens like yy, MMMM, MMM, EEEE, E, hh, h, and a are completely ignored, which will cause formatting to fail or output raw pattern characters.
Using a single-pass regular expression replacement is the most robust way to map these tokens safely without causing recursive replacement collisions (e.g., replacing d after dd has already introduced %d).
| py_fmt = ( | |
| str(fmt) | |
| .replace("yyyy", "%Y") | |
| .replace("MM", "%m") | |
| .replace("dd", "%d") | |
| .replace("HH", "%H") | |
| .replace("mm", "%M") | |
| .replace("ss", "%S") | |
| ) | |
| token_map = { | |
| "EEEE": "%A", "E": "%a", | |
| "yyyy": "%Y", "yy": "%y", | |
| "MMMM": "%B", "MMM": "%b", "MM": "%m", "M": "%m", | |
| "dd": "%d", "d": "%d", | |
| "HH": "%H", "H": "%H", "hh": "%I", "h": "%I", | |
| "mm": "%M", "ss": "%S", "a": "%p" | |
| } | |
| pattern = re.compile("|".join(sorted(token_map.keys(), key=len, reverse=True))) | |
| py_fmt = pattern.sub(lambda m: token_map[m.group(0)], str(fmt)) |
| is_dynamic = ( | ||
| isinstance(value, dict) | ||
| and "path" in value | ||
| and isinstance(value["path"], str) | ||
| ) |
There was a problem hiding this comment.
The is_dynamic check should exclude TemplateChildList objects (which contain a componentId key) to align with DataContext.resolve_dynamic_value and avoid setting up redundant dummy subscriptions.
| is_dynamic = ( | |
| isinstance(value, dict) | |
| and "path" in value | |
| and isinstance(value["path"], str) | |
| ) | |
| is_dynamic = ( | |
| isinstance(value, dict) | |
| and "path" in value | |
| and isinstance(value["path"], str) | |
| and "componentId" not in value | |
| ) |
| current = current[token] | ||
| elif isinstance(current, list) and NUMERIC_PATTERN.match(token): |
There was a problem hiding this comment.
Expanding the list sequentially using a while loop with append is highly inefficient and can lead to Out Of Memory (OOM) errors or performance bottlenecks if an unreasonably large index is passed.
Using extend is much faster, and adding a safety threshold check protects against potential denial of service vectors.
| current = current[token] | |
| elif isinstance(current, list) and NUMERIC_PATTERN.match(token): | |
| if idx > 10000: | |
| raise ValueError(f"Index {idx} exceeds maximum allowed array size.") | |
| current.extend([None] * (idx - len(current) + 1)) |
| while not scanner.is_at_end(): | ||
| c = scanner.advance() | ||
| if c == "\\": | ||
| next_c = scanner.advance() | ||
| if next_c == "n": | ||
| result += "\n" | ||
| elif next_c == "t": | ||
| result += "\t" | ||
| elif next_c == "r": | ||
| result += "\r" | ||
| else: | ||
| result += next_c | ||
| elif c == quote: | ||
| break | ||
| else: | ||
| result += c |
There was a problem hiding this comment.
An unclosed string literal at the end of an expression is currently accepted silently because the loop terminates at the end of the scanner input without raising an error. It should raise a ValueError to ensure syntax correctness.
| while not scanner.is_at_end(): | |
| c = scanner.advance() | |
| if c == "\\": | |
| next_c = scanner.advance() | |
| if next_c == "n": | |
| result += "\n" | |
| elif next_c == "t": | |
| result += "\t" | |
| elif next_c == "r": | |
| result += "\r" | |
| else: | |
| result += next_c | |
| elif c == quote: | |
| break | |
| else: | |
| result += c | |
| closed = False | |
| while not scanner.is_at_end(): | |
| c = scanner.advance() | |
| if c == "\\": | |
| next_c = scanner.advance() | |
| if next_c == "n": | |
| result += "\n" | |
| elif next_c == "t": | |
| result += "\t" | |
| elif next_c == "r": | |
| result += "\r" | |
| else: | |
| result += next_c | |
| elif c == quote: | |
| closed = True | |
| break | |
| else: | |
| result += c | |
| if not closed: | |
| raise ValueError("Unclosed string literal") |
| if isinstance(action, dict) and "event" in action: | ||
| evt = copy.deepcopy(action["event"]) | ||
| resolved_context = {} | ||
| if isinstance(evt.get("context"), dict): | ||
| for k, v in evt["context"].items(): | ||
| resolved_context[k] = self.resolve_dynamic_value(v) | ||
| evt["context"] = resolved_context | ||
| return {"event": evt} |
There was a problem hiding this comment.
If action["event"] is not a dictionary (e.g., if it is malformed or of an unexpected type), calling evt.get("context") will raise an AttributeError. Adding an explicit type check ensures robust defensive programming.
| if isinstance(action, dict) and "event" in action: | |
| evt = copy.deepcopy(action["event"]) | |
| resolved_context = {} | |
| if isinstance(evt.get("context"), dict): | |
| for k, v in evt["context"].items(): | |
| resolved_context[k] = self.resolve_dynamic_value(v) | |
| evt["context"] = resolved_context | |
| return {"event": evt} | |
| if isinstance(action, dict) and "event" in action: | |
| evt = copy.deepcopy(action["event"]) | |
| if isinstance(evt, dict): | |
| resolved_context = {} | |
| if isinstance(evt.get("context"), dict): | |
| for k, v in evt["context"].items(): | |
| resolved_context[k] = self.resolve_dynamic_value(v) | |
| evt["context"] = resolved_context | |
| return {"event": evt} |
Description
Replace this paragraph with a description of what this PR is changing or adding, and why. Consider including before/after screenshots.
List which issues are fixed by this PR. For larger changes, raising an issue first helps reduce redundant work.
Pre-launch Checklist
If you need help, consider asking for advice on the discussion board.