From cf720f16fb8a1f9928e8ce70656d9f7ae200800e Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 31 May 2026 16:46:54 -0700 Subject: [PATCH 1/5] [test] Add structured tool-invocation assertion helper and adopt in chat model cross-language test Replace the weak "3 in output" substring check in the chat model cross-language e2e test with a structured assertion that the agent actually invoked the add tool with the correct arguments. Add two shared helpers to e2e test_utils: - collect_tool_invocations(log_dir): reads events-*.log written by the FileEventLogger, filters _tool_request_event records, and extracts each tool call's name and arguments from the nested function field. - assert_tool_invoked(invocations, name, arguments): asserts a matching tool call, normalizing arguments given as a dict or a JSON string. Cover the helpers with fixture-based unit tests that need no live model. --- .../chat_model_cross_language_test.py | 13 +- python/flink_agents/e2e_tests/test_utils.py | 84 ++++++++++++ .../e2e_tests/test_utils_unit_test.py | 129 ++++++++++++++++++ 3 files changed, 224 insertions(+), 2 deletions(-) create mode 100644 python/flink_agents/e2e_tests/test_utils_unit_test.py diff --git a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/chat_model_cross_language_test.py b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/chat_model_cross_language_test.py index 8af10c3fc..ece74ecad 100644 --- a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/chat_model_cross_language_test.py +++ b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/chat_model_cross_language_test.py @@ -36,7 +36,11 @@ from flink_agents.e2e_tests.e2e_tests_resource_cross_language.chat_model_cross_language_agent import ( ChatModelCrossLanguageAgent, ) -from flink_agents.e2e_tests.test_utils import pull_model +from flink_agents.e2e_tests.test_utils import ( + assert_tool_invoked, + collect_tool_invocations, + pull_model, +) current_dir = Path(__file__).parent @@ -72,6 +76,9 @@ def test_java_chat_model_integration( deserialize_datastream = input_datastream.map(lambda x: str(x)) agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + log_dir = tmp_path / "event_logs" + log_dir.mkdir(parents=True, exist_ok=True) + agents_env.get_config().set_str("baseLogDir", str(log_dir)) output_datastream = ( agents_env.from_datastream( input=deserialize_datastream, key_selector=lambda x: "orderKey" @@ -106,6 +113,8 @@ def test_java_chat_model_integration( with file.open() as f: actual_result.extend(f.readlines()) + invocations = collect_tool_invocations(log_dir) + assert_tool_invoked(invocations, "add", {"a": 1, "b": 2}) + joined = "\n".join(actual_result).lower() - assert "3" in joined, f"math answer missing '3': {actual_result!r}" assert "cat" in joined, f"creative answer missing 'cat': {actual_result!r}" diff --git a/python/flink_agents/e2e_tests/test_utils.py b/python/flink_agents/e2e_tests/test_utils.py index eab34a07b..29794726b 100644 --- a/python/flink_agents/e2e_tests/test_utils.py +++ b/python/flink_agents/e2e_tests/test_utils.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################# +import json import subprocess from pathlib import Path @@ -23,6 +24,89 @@ current_dir = Path(__file__).parent +def _normalize_arguments(arguments: object) -> dict: + """Return tool-call arguments as a dict, parsing a JSON string if needed. + + Args: + arguments: Tool-call arguments, either a mapping (Ollama path), a + JSON-encoded string (some providers ``json.dumps`` the arguments), + or ``None`` for a no-argument tool call. + + Returns: + The arguments as a dict; an empty dict when ``arguments`` is ``None``. + """ + if arguments is None: + return {} + if isinstance(arguments, str): + return json.loads(arguments) + return dict(arguments) + + +def collect_tool_invocations(log_dir: str | Path) -> list[dict]: + """Read ``events-*.log`` under ``log_dir`` and return tool invocations in order. + + Globs the per-subtask event-log files the ``FileEventLogger`` writes, parses + each JSONL record, and extracts every ``_tool_request_event`` tool call. The + tool-call dict is nested under ``function`` in the wire format. + + Args: + log_dir: Directory containing the ``events-*.log`` files (the configured + ``baseLogDir``). + + Returns: + Ordered list of ``{"name": str, "arguments": dict | str}``. Empty when the + model invoked no tool (a legitimate, assertable outcome). + """ + invocations = [] + for log_file in sorted(Path(log_dir).glob("events-*.log")): + with log_file.open() as handle: + for line in handle: + if not line.strip(): + continue + record = json.loads(line) + if record.get("eventType") != "_tool_request_event": + continue + tool_calls = record["event"]["attributes"].get("tool_calls", []) + for tool_call in tool_calls: + function = tool_call["function"] + invocations.append( + { + "name": function["name"], + "arguments": function["arguments"], + } + ) + return invocations + + +def assert_tool_invoked(invocations: list[dict], name: str, arguments: dict) -> None: + """Assert some invocation called tool ``name`` with arguments equal to ``arguments``. + + Argument values are compared after normalizing both sides to a dict (a + JSON-string ``arguments`` is parsed first), so the comparison is + order-independent and tolerant of providers that encode arguments as a string. + + Args: + invocations: Tool invocations as returned by :func:`collect_tool_invocations`. + name: Expected tool name. + arguments: Expected tool arguments. + + Raises: + AssertionError: If no invocation matches both ``name`` and ``arguments``; + the message dumps the actual invocations. + """ + expected_args = _normalize_arguments(arguments) + for invocation in invocations: + if invocation["name"] != name: + continue + if _normalize_arguments(invocation["arguments"]) == expected_args: + return + message = ( + f"No invocation of tool {name!r} with arguments {expected_args!r}; " + f"actual invocations: {invocations!r}" + ) + raise AssertionError(message) + + def pull_model(ollama_model: str) -> Client: """Run ollama pull ollama_model.""" try: diff --git a/python/flink_agents/e2e_tests/test_utils_unit_test.py b/python/flink_agents/e2e_tests/test_utils_unit_test.py new file mode 100644 index 000000000..f15089a4f --- /dev/null +++ b/python/flink_agents/e2e_tests/test_utils_unit_test.py @@ -0,0 +1,129 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import json +from pathlib import Path + +import pytest + +from flink_agents.e2e_tests.test_utils import ( + assert_tool_invoked, + collect_tool_invocations, +) + + +def _tool_request_record(tool_calls: list) -> dict: + """Build a record matching the FileEventLogger wire format for a tool request.""" + return { + "timestamp": "2026-05-31T00:00:00Z", + "logLevel": "STANDARD", + "eventType": "_tool_request_event", + "event": { + "eventType": "_tool_request_event", + "id": "00000000-0000-0000-0000-000000000001", + "attributes": {"model": "qwen3:1.7b", "tool_calls": tool_calls}, + }, + } + + +def _function_tool_call(name: str, arguments: object) -> dict: + return { + "id": "00000000-0000-0000-0000-0000000000aa", + "type": "function", + "function": {"name": name, "arguments": arguments}, + } + + +def _write_log(log_dir: Path, records: list) -> None: + log_dir.mkdir(parents=True, exist_ok=True) + with (log_dir / "events-0.log").open("w") as handle: + for record in records: + handle.write(json.dumps(record) + "\n") + + +def test_collect_tool_invocations(tmp_path: Path) -> None: + """Parser extracts nested function.name/function.arguments from a tool request line. + + Records with a different eventType are ignored. + """ + log_dir = tmp_path / "event_logs" + other_event = { + "timestamp": "2026-05-31T00:00:00Z", + "logLevel": "STANDARD", + "eventType": "_input_event", + "event": {"eventType": "_input_event", "id": "x", "attributes": {}}, + } + _write_log( + log_dir, + [ + other_event, + _tool_request_record([_function_tool_call("add", {"a": 1, "b": 2})]), + ], + ) + + assert collect_tool_invocations(log_dir) == [ + {"name": "add", "arguments": {"a": 1, "b": 2}} + ] + + +def test_collect_tool_invocations_no_tool(tmp_path: Path) -> None: + """A run with no tool request event yields an empty list.""" + log_dir = tmp_path / "event_logs" + _write_log( + log_dir, + [ + { + "timestamp": "2026-05-31T00:00:00Z", + "logLevel": "STANDARD", + "eventType": "_output_event", + "event": { + "eventType": "_output_event", + "id": "y", + "attributes": {}, + }, + } + ], + ) + + assert collect_tool_invocations(log_dir) == [] + + +def test_assert_tool_invoked_dict_args() -> None: + """Passes when an invocation matches name and dict args (order-independent).""" + invocations = [{"name": "add", "arguments": {"b": 2, "a": 1}}] + assert_tool_invoked(invocations, "add", {"a": 1, "b": 2}) + + +def test_assert_tool_invoked_json_string_args() -> None: + """Passes when the recorded arguments are a JSON string rather than a dict.""" + invocations = [{"name": "add", "arguments": '{"a": 1, "b": 2}'}] + assert_tool_invoked(invocations, "add", {"a": 1, "b": 2}) + + +def test_assert_tool_invoked_none_args() -> None: + """A no-arg tool call recorded as ``None`` matches an expected empty dict.""" + invocations = [{"name": "now", "arguments": None}] + assert_tool_invoked(invocations, "now", {}) + + +def test_assert_tool_invoked_mismatch_reports_invocations() -> None: + """Raises AssertionError dumping the actual invocations on a mismatch.""" + invocations = [{"name": "add", "arguments": {"a": 9, "b": 9}}] + with pytest.raises(AssertionError) as exc_info: + assert_tool_invoked(invocations, "add", {"a": 1, "b": 2}) + assert "add" in str(exc_info.value) + assert "9" in str(exc_info.value) From 0a801566e11848f159c4d244a9e8a1ab0059a735 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 31 May 2026 16:55:34 -0700 Subject: [PATCH 2/5] [test] Assert tool invocations in yaml and react remote cross-language e2e tests Strengthen two more live-LLM Flink-path tests using the structured tool-invocation helper: - yaml cross-language test: replace the "22 in output" substring check with an assertion that the Java calculateBMI tool was invoked with the weight and height parsed from the input. - react agent remote-runner test: in addition to checking the output has a result field, assert the add and multiply tools were invoked with the expected arguments, including multiply's first argument being add's threaded result. Both read the tool calls from the event log via collect_tool_invocations. --- .../e2e_tests_integration/react_agent_test.py | 17 ++++++++++++++- .../yaml_cross_language_test.py | 21 ++++++++++++++----- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py b/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py index 04d0be41d..402d0b72d 100644 --- a/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py +++ b/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py @@ -44,7 +44,11 @@ ) from flink_agents.api.tools.tool import Tool from flink_agents.e2e_tests.e2e_tests_integration.react_agent_tools import add, multiply -from flink_agents.e2e_tests.test_utils import pull_model +from flink_agents.e2e_tests.test_utils import ( + assert_tool_invoked, + collect_tool_invocations, + pull_model, +) current_dir = Path(__file__).parent @@ -169,6 +173,10 @@ def test_react_agent_on_remote_runner( env.get_config().set(AgentExecutionOptions.MAX_RETRIES, 3) + log_dir = tmp_path / "event_logs" + log_dir.mkdir(parents=True, exist_ok=True) + env.get_config().set_str("baseLogDir", str(log_dir)) + # register resource to execution environment ( env.add_resource( @@ -244,3 +252,10 @@ def test_react_agent_on_remote_runner( "This may be caused by the LLM response does not match the output schema, you can rerun this case." ) assert "result" in json.loads(actual_result[0].strip()) + + # Input (a=1, b=2, c=3) computes (a + b) * c, so the agent must thread + # add(1, 2)=3 into multiply(3, 3)=9. Asserting the multiply first arg + # validates the reasoning chain, not just that each tool fired. + invocations = collect_tool_invocations(log_dir) + assert_tool_invoked(invocations, "add", {"a": 1, "b": 2}) + assert_tool_invoked(invocations, "multiply", {"a": 3, "b": 3}) diff --git a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_cross_language_test.py b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_cross_language_test.py index ee9ed231e..f26b23c45 100644 --- a/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_cross_language_test.py +++ b/python/flink_agents/e2e_tests/e2e_tests_resource_cross_language/yaml_cross_language_test.py @@ -46,7 +46,11 @@ ) from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.e2e_tests.test_utils import pull_model +from flink_agents.e2e_tests.test_utils import ( + assert_tool_invoked, + collect_tool_invocations, + pull_model, +) current_dir = Path(__file__).parent _RESOURCES = current_dir.parent / "resources" @@ -116,6 +120,9 @@ def test_yaml_cross_language_agent( deserialize_datastream = input_datastream.map(lambda x: str(x)) agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) + log_dir = tmp_path / "event_logs" + log_dir.mkdir(parents=True, exist_ok=True) + agents_env.get_config().set_str("baseLogDir", str(log_dir)) agents_env.load_yaml(_RESOURCES / "yaml_cross_language_agent.yaml") output_datastream = ( @@ -152,12 +159,16 @@ def test_yaml_cross_language_agent( with file.open() as f: actual_result.extend(f.readlines()) - # Math path went through the Java ``calculateBMI`` tool: - # 70 / (1.75 * 1.75) ≈ 22.86, so the final answer should mention 22. - # Creative path doesn't use any tool. + # Math path went through the Java ``calculateBMI`` tool, called with the + # weight/height parsed from the input ("1.75 meters tall and weighs 70 kg"). + assert_tool_invoked( + collect_tool_invocations(log_dir), + "calculateBMI", + {"weightKg": 70, "heightM": 1.75}, + ) + # Creative path doesn't use any tool; its answer mentions a cat. # NOTE: We join all results and search without relying on order, because # StreamingFileSink may produce multiple part files and iterdir() does not # guarantee a deterministic traversal order across platforms. joined = "\n".join(actual_result).lower() - assert "22" in joined, f"math answer missing '22': {actual_result!r}" assert "cat" in joined, f"creative answer missing 'cat': {actual_result!r}" From d3794718b350a01711e76b229a9fbfe19b4c29aa Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 31 May 2026 17:15:21 -0700 Subject: [PATCH 3/5] [runtime] Capture tool request events in the local runner The Flink execution path persists tool events to an event log, but the pure-Python local runner does not, so tests on the local runner cannot assert which tools an agent invoked. Capture ToolRequestEvents as they pass through the local runner's event deque and expose them via get_tool_request_events() on both the runner and the local execution environment. The capture falls through to normal dispatch, so tool requests still reach their action. Add a tool_invocations_from_events adapter so in-memory events and event-log records yield the same tool-invocation shape for assertions. --- python/flink_agents/e2e_tests/test_utils.py | 31 ++++ .../e2e_tests/test_utils_unit_test.py | 21 +++ .../runtime/local_execution_environment.py | 5 + python/flink_agents/runtime/local_runner.py | 18 ++ .../tests/test_local_execution_environment.py | 163 +++++++++++++++++- 5 files changed, 236 insertions(+), 2 deletions(-) diff --git a/python/flink_agents/e2e_tests/test_utils.py b/python/flink_agents/e2e_tests/test_utils.py index 29794726b..7e8e7505b 100644 --- a/python/flink_agents/e2e_tests/test_utils.py +++ b/python/flink_agents/e2e_tests/test_utils.py @@ -21,6 +21,8 @@ from ollama import Client +from flink_agents.api.events.tool_event import ToolRequestEvent + current_dir = Path(__file__).parent @@ -78,6 +80,35 @@ def collect_tool_invocations(log_dir: str | Path) -> list[dict]: return invocations +def tool_invocations_from_events(events: list[ToolRequestEvent]) -> list[dict]: + """Normalize live ``ToolRequestEvent`` objects to the same invocation shape. + + Adapts the in-memory capture (the ``LocalRunner`` hook) to the same + ``{name, arguments}`` shape :func:`collect_tool_invocations` returns from the + event log, so both sources feed :func:`assert_tool_invoked` identically. Each + event's ``tool_calls`` is a list of nested ``{id, type, function:{name, + arguments}}`` dicts; order is preserved. + + Args: + events: ``ToolRequestEvent`` objects captured during a local run. + + Returns: + Ordered list of ``{"name": str, "arguments": dict | str}``, one per tool + call across all events. + """ + invocations = [] + for event in events: + for tool_call in event.tool_calls: + function = tool_call["function"] + invocations.append( + { + "name": function["name"], + "arguments": function["arguments"], + } + ) + return invocations + + def assert_tool_invoked(invocations: list[dict], name: str, arguments: dict) -> None: """Assert some invocation called tool ``name`` with arguments equal to ``arguments``. diff --git a/python/flink_agents/e2e_tests/test_utils_unit_test.py b/python/flink_agents/e2e_tests/test_utils_unit_test.py index f15089a4f..2236abd71 100644 --- a/python/flink_agents/e2e_tests/test_utils_unit_test.py +++ b/python/flink_agents/e2e_tests/test_utils_unit_test.py @@ -20,9 +20,11 @@ import pytest +from flink_agents.api.events.tool_event import ToolRequestEvent from flink_agents.e2e_tests.test_utils import ( assert_tool_invoked, collect_tool_invocations, + tool_invocations_from_events, ) @@ -127,3 +129,22 @@ def test_assert_tool_invoked_mismatch_reports_invocations() -> None: assert_tool_invoked(invocations, "add", {"a": 1, "b": 2}) assert "add" in str(exc_info.value) assert "9" in str(exc_info.value) + + +def test_tool_invocations_from_events() -> None: + """Live ToolRequestEvents normalize to the same {name, arguments} shape. + + One event carrying two tool calls yields one invocation per call, in order. + """ + event = ToolRequestEvent( + model="qwen3:1.7b", + tool_calls=[ + _function_tool_call("add", {"a": 1, "b": 2}), + _function_tool_call("multiply", {"a": 4444, "b": 312}), + ], + ) + + assert tool_invocations_from_events([event]) == [ + {"name": "add", "arguments": {"a": 1, "b": 2}}, + {"name": "multiply", "arguments": {"a": 4444, "b": 312}}, + ] diff --git a/python/flink_agents/runtime/local_execution_environment.py b/python/flink_agents/runtime/local_execution_environment.py index 890963072..c9768fc31 100644 --- a/python/flink_agents/runtime/local_execution_environment.py +++ b/python/flink_agents/runtime/local_execution_environment.py @@ -22,6 +22,7 @@ from pyflink.table import Schema, StreamTableEnvironment, Table from flink_agents.api.agents.agent import Agent +from flink_agents.api.events.tool_event import ToolRequestEvent from flink_agents.api.execution_environment import ( AgentBuilder, AgentsExecutionEnvironment, @@ -141,6 +142,10 @@ def execute(self, job_name: str | None = None) -> None: for output in outputs: self.__output.append(output) + def get_tool_request_events(self) -> List[ToolRequestEvent]: + """Get the ToolRequestEvents captured by the runner during execution.""" + return self.__runner.get_tool_request_events() + def from_datastream( self, input: DataStream, key_selector: KeySelector | Callable | None = None ) -> AgentBuilder: diff --git a/python/flink_agents/runtime/local_runner.py b/python/flink_agents/runtime/local_runner.py index 078fd7f1d..38436a2c7 100644 --- a/python/flink_agents/runtime/local_runner.py +++ b/python/flink_agents/runtime/local_runner.py @@ -26,6 +26,7 @@ from flink_agents.api.agents.agent import Agent from flink_agents.api.events.event import Event, InputEvent, OutputEvent +from flink_agents.api.events.tool_event import ToolRequestEvent from flink_agents.api.memory.long_term_memory import BaseLongTermMemory from flink_agents.api.memory_object import MemoryObject, MemoryType from flink_agents.api.metric_group import MetricGroup @@ -273,6 +274,8 @@ class LocalRunner(AgentRunner): Dictionary of active contexts indexed by key. __outputs: Outputs generated by agent execution. + __tool_request_events: + ToolRequestEvents observed during agent execution. __config: Internal configration. """ @@ -280,6 +283,7 @@ class LocalRunner(AgentRunner): __agent_plan: Any __keyed_contexts: Dict[Any, LocalRunnerContext] __outputs: List[Dict[str, Any]] + __tool_request_events: List[ToolRequestEvent] __config: AgentConfiguration def __init__(self, agent: Agent, config: AgentConfiguration) -> None: @@ -295,6 +299,7 @@ def __init__(self, agent: Agent, config: AgentConfiguration) -> None: self.__agent_plan = AgentPlan.from_agent(agent, config) self.__keyed_contexts = {} self.__outputs = [] + self.__tool_request_events = [] self.__config = config @override @@ -340,6 +345,9 @@ def run(self, **data: Dict[str, Any]) -> Any: if isinstance(event, OutputEvent): self.__outputs.append({key: event.output}) continue + if isinstance(event, ToolRequestEvent): + self.__tool_request_events.append(event) + # Fall through: the request must still dispatch to its action. event_type = event.get_type() for action in self.__agent_plan.get_actions(event_type): logger.info("key: %s, performing action: %s", key, action.name) @@ -367,3 +375,13 @@ def get_outputs(self) -> List[Dict[str, Any]]: The agent execution outputs. """ return self.__outputs + + def get_tool_request_events(self) -> List[ToolRequestEvent]: + """Get the ToolRequestEvents captured during agent execution. + + Returns: + ------- + List[ToolRequestEvent] + The ToolRequestEvents observed in the run loop, in order. + """ + return self.__tool_request_events diff --git a/python/flink_agents/runtime/tests/test_local_execution_environment.py b/python/flink_agents/runtime/tests/test_local_execution_environment.py index 51250c0c7..1c861b359 100644 --- a/python/flink_agents/runtime/tests/test_local_execution_environment.py +++ b/python/flink_agents/runtime/tests/test_local_execution_environment.py @@ -16,15 +16,29 @@ # limitations under the License. ################################################################################# import time -from typing import ClassVar +import uuid +from typing import Any, ClassVar, Dict, List, Sequence import pytest from flink_agents.api.agents.agent import Agent -from flink_agents.api.decorators import action +from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.chat_models.chat_model import ( + BaseChatModelConnection, + BaseChatModelSetup, +) +from flink_agents.api.decorators import ( + action, + chat_model_connection, + chat_model_setup, + tool, +) +from flink_agents.api.events.chat_event import ChatRequestEvent, ChatResponseEvent from flink_agents.api.events.event import Event, InputEvent, OutputEvent from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.resource import ResourceDescriptor, ResourceType from flink_agents.api.runner_context import RunnerContext +from flink_agents.api.tools.tool import ToolType class Agent1(Agent): @@ -218,3 +232,148 @@ def test_mixed_event_workflow() -> None: env.execute() assert output_list == [{"bob": "done:42"}] + + +# ── Tool-request capture hook (Track B) ────────────────────────────────── + + +class _ToolConnection(BaseChatModelConnection): + """Mock connection emitting a single ``add`` tool call.""" + + def chat( + self, + messages: Sequence[ChatMessage], + tools: List | None = None, + **kwargs: Any, + ) -> ChatMessage: + """Emit an ``add`` tool call, then echo the tool result as content.""" + last = messages[-1] + if last.role == MessageRole.TOOL: + return ChatMessage(role=MessageRole.ASSISTANT, content=str(last.content)) + tool_call = { + "id": str(uuid.uuid4()), + "type": ToolType.FUNCTION, + "function": {"name": "add", "arguments": {"a": 1, "b": 2}}, + } + return ChatMessage( + role=MessageRole.ASSISTANT, content="", tool_calls=[tool_call] + ) + + +class _ToolChatModel(BaseChatModelSetup): + """Mock setup binding the ``add`` tool to the connection.""" + + def open(self) -> None: + """Do nothing.""" + + @property + def model_kwargs(self) -> Dict[str, Any]: + """Return model kwargs.""" + return {} + + def chat( + self, + messages: Sequence[ChatMessage], + prompt_args: Dict[str, Any] | None = None, + **kwargs: Any, + ) -> ChatMessage: + """Bind tools and delegate to the connection.""" + server = self.resource_context.get_resource( + self.connection, ResourceType.CHAT_MODEL_CONNECTION + ) + tools = [ + self.resource_context.get_resource(name, ResourceType.TOOL) + for name in (self.tools or []) + ] + return server.chat(messages, tools=tools, **kwargs) + + +class ToolRequestAgent(Agent): + """Agent whose chat model emits a ToolRequestEvent dispatched to ``add``. + + The InputEvent action sends a ChatRequestEvent; the mock chat model returns + an ``add`` tool call, which the built-in chat/tool actions turn into a real + ToolRequestEvent flowing through the runner. The ToolRequestEvent is captured + by the runner AND still dispatched to ``tool_call_action`` — the final output + (the tool result) proves capture did not swallow the event. + """ + + @chat_model_connection + @staticmethod + def conn() -> ResourceDescriptor: + """Mock chat model connection.""" + return ResourceDescriptor( + clazz=f"{_ToolConnection.__module__}.{_ToolConnection.__name__}" + ) + + @chat_model_setup + @staticmethod + def model() -> ResourceDescriptor: + """Mock chat model bound to the ``add`` tool.""" + return ResourceDescriptor( + clazz=f"{_ToolChatModel.__module__}.{_ToolChatModel.__name__}", + connection="conn", + model="mock-model", + tools=["add"], + ) + + @tool + @staticmethod + def add(a: int, b: int) -> int: + """Return the sum of a and b. + + Parameters + ---------- + a : int + The first operand. + b : int + The second operand. + + Returns: + ------- + int: + The sum of a and b. + """ + return a + b + + @action(InputEvent.EVENT_TYPE) + @staticmethod + def process_input(event: Event, ctx: RunnerContext) -> None: + """Send a ChatRequestEvent to drive the tool-calling flow.""" + input = InputEvent.from_event(event).input + ctx.send_event( + ChatRequestEvent( + model="model", + messages=[ChatMessage(role=MessageRole.USER, content=input)], + ) + ) + + @action(ChatResponseEvent.EVENT_TYPE) + @staticmethod + def process_response(event: Event, ctx: RunnerContext) -> None: + """Emit the final assistant content as output.""" + response = ChatResponseEvent.from_event(event).response + ctx.send_event(OutputEvent(output=response.content)) + + +def test_local_runner_captures_tool_request_events() -> None: + """A ToolRequestEvent is captured AND still dispatched to its action.""" + env = AgentsExecutionEnvironment.get_execution_environment() + + input_list = [] + agent = ToolRequestAgent() + + output_list = env.from_list(input_list).apply(agent).to_list() + + input_list.append({"key": "0001", "value": "add 1 and 2"}) + env.execute() + + captured = env.get_tool_request_events() + assert len(captured) == 1 + assert captured[0].tool_calls[0]["function"] == { + "name": "add", + "arguments": {"a": 1, "b": 2}, + } + # Dispatch was not swallowed: tool_call_action ran, producing the tool result + # that the model echoed back as the final output. + assert output_list == [{"0001": "3"}] From f85026c0a1a3e1e709385d85c9bef00db8ba2a83 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 31 May 2026 17:52:09 -0700 Subject: [PATCH 4/5] [test] Assert multiply tool invocation in react local-runner test Strengthen the react agent local-runner test, which previously asserted only the exact output value. It now also asserts the multiply tool was invoked with the threaded sum as its first argument, read from the local runner's captured tool events. Assert multiply rather than add because the small model frequently computes the addition itself and only calls the multiply tool, so an add assertion would be an unreliable signal. The kept value assertion is normalized to tolerate equivalent numeric representations; it remains because the final result is a separate model synthesis step that can be wrong even when the tool calls are correct. --- .../e2e_tests/e2e_tests_integration/react_agent_test.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py b/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py index 402d0b72d..0f662253b 100644 --- a/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py +++ b/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py @@ -48,6 +48,7 @@ assert_tool_invoked, collect_tool_invocations, pull_model, + tool_invocations_from_events, ) current_dir = Path(__file__).parent @@ -136,7 +137,13 @@ def test_react_agent_on_local_runner(monkeypatch: pytest.MonkeyPatch) -> None: assert len(output_list) == 1, ( "This may be caused by the LLM response does not match the output schema, you can rerun this case." ) - assert output_list[0]["0001"].result == 1386528 + assert int(output_list[0]["0001"].result) == 1386528 + + # multiply's first arg (4444 = 2123 + 2321) proves the addition was computed + # correctly and the multiply tool was used; the model often does the addition + # without the add tool, so add is not a reliable signal to assert on. + invocations = tool_invocations_from_events(env.get_tool_request_events()) + assert_tool_invoked(invocations, "multiply", {"a": 4444, "b": 312}) @pytest.mark.skipif( From 105be81a06f202b5dfa247b1cdac8b5b9d069b82 Mon Sep 17 00:00:00 2001 From: Weiqing Yang Date: Sun, 31 May 2026 20:40:34 -0700 Subject: [PATCH 5/5] [test] Align react remote-runner tool assertion with local-runner The remote-runner test asserted add(1, 2) and multiply(3, 3) on inputs (1, 2, 3). qwen3:1.7b reliably computes 1+2 in its head and calls multiply directly without the add tool, so the add assertion fails non-deterministically. The local-runner test already documents this and asserts only multiply(sum, c) on large inputs that force genuine tool use. Mirror that approach on the remote runner: use inputs (2123, 2321, 312), assert the exact result value, and assert multiply(4444, 312) read back through the event-log capture path. The multiply first arg (4444 = 2123 + 2321) proves the addition was computed correctly and threaded into multiply, without depending on whether the add tool fired. --- .../e2e_tests_integration/react_agent_test.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py b/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py index 0f662253b..f0c771785 100644 --- a/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py +++ b/python/flink_agents/e2e_tests/e2e_tests_integration/react_agent_test.py @@ -160,7 +160,7 @@ def test_react_agent_on_remote_runner( t_env = StreamTableEnvironment.create(stream_execution_environment=stream_env) table = t_env.from_elements( - elements=[(1, 2, 3)], + elements=[(2123, 2321, 312)], schema=DataTypes.ROW( [ DataTypes.FIELD("a", DataTypes.INT()), @@ -258,11 +258,12 @@ def test_react_agent_on_remote_runner( assert len(actual_result) == 1, ( "This may be caused by the LLM response does not match the output schema, you can rerun this case." ) - assert "result" in json.loads(actual_result[0].strip()) + assert json.loads(actual_result[0].strip())["result"] == 1386528 - # Input (a=1, b=2, c=3) computes (a + b) * c, so the agent must thread - # add(1, 2)=3 into multiply(3, 3)=9. Asserting the multiply first arg - # validates the reasoning chain, not just that each tool fired. + # multiply's first arg (4444 = 2123 + 2321) proves the addition was computed + # correctly and threaded into multiply; the model often does the addition + # without the add tool, so add is not a reliable signal to assert on. This + # exercises the same reasoning chain as the local-runner test, but read back + # through the event-log capture path. invocations = collect_tool_invocations(log_dir) - assert_tool_invoked(invocations, "add", {"a": 1, "b": 2}) - assert_tool_invoked(invocations, "multiply", {"a": 3, "b": 3}) + assert_tool_invoked(invocations, "multiply", {"a": 4444, "b": 312})