Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions python/packages/core/agent_framework/_workflows/_agent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,62 @@ class AgentExecutorResponse:
agent_response: AgentResponse
full_conversation: list[Message]

def with_text(self, text: str) -> "AgentExecutorResponse":
"""Create a new AgentExecutorResponse with replaced text, preserving the conversation history.

Use this in custom executors that transform agent output text (e.g. upper-casing, summarising)
when you need downstream AgentExecutors to still have access to the full prior conversation.

Without this helper, sending a plain ``str`` from a custom executor breaks the context chain:
the downstream ``AgentExecutor.from_str`` handler only adds that one string to its cache and
loses all prior messages. By using ``with_text`` the response type stays
``AgentExecutorResponse``, so ``AgentExecutor.from_response`` is invoked instead and the full
conversation is preserved.

Args:
text: The replacement assistant message text.

Returns:
A new ``AgentExecutorResponse`` whose ``agent_response`` contains a single assistant
message with ``text``, and whose ``full_conversation`` is the prior conversation
(everything before the original agent turn) followed by the new assistant message.

Example:
.. code-block:: python

from agent_framework import AgentExecutorResponse, WorkflowContext, executor


@executor(
id="upper_case_executor",
input=AgentExecutorResponse,
output=AgentExecutorResponse,
workflow_output=str,
)
async def upper_case(
response: AgentExecutorResponse,
ctx: WorkflowContext[AgentExecutorResponse, str],
) -> None:
upper_text = response.agent_response.text.upper()
await ctx.send_message(response.with_text(upper_text))
await ctx.yield_output(upper_text)
"""
new_message = Message("assistant", [text])
new_agent_response = AgentResponse(messages=[new_message])

# Strip off the original agent turn and replace with the new text.
n_agent_messages = len(self.agent_response.messages)
prior_messages = (
self.full_conversation[:-n_agent_messages] if n_agent_messages else list(self.full_conversation)
)
new_full_conversation = [*prior_messages, new_message]

return AgentExecutorResponse(
executor_id=self.executor_id,
agent_response=new_agent_response,
full_conversation=new_full_conversation,
)


class AgentExecutor(Executor):
"""built-in executor that wraps an agent for handling messages.
Expand Down Expand Up @@ -183,7 +239,25 @@ async def from_str(
"""Accept a raw user prompt string and run the agent.

The new string input will be added to the cache which is used as the conversation context for the agent run.

Warning:
If the upstream executor received an ``AgentExecutorResponse`` but emits a plain
``str``, this handler will be invoked instead of ``from_response``. This resets
the conversation context because only the new string is added to the cache and
all prior messages from the upstream agent are lost.

To preserve the full conversation when transforming agent output in a custom
executor, use ``AgentExecutorResponse.with_text(...)`` so that the message type
stays ``AgentExecutorResponse`` and ``from_response`` is called instead.
"""
if not self._cache and ctx.source_executor_ids != ["Workflow"]:
logger.warning(
"AgentExecutor '%s': from_str handler invoked with an empty cache. "
"If you are chaining from an AgentExecutor, the upstream custom executor may be "
"emitting a plain str instead of using AgentExecutorResponse.with_text(...), "
"which causes the full conversation context to be lost.",
self.id,
)
self._cache.extend(normalize_messages_input(text))
await self._run_agent_and_emit(ctx)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,19 @@ async def process(self, data: str, ctx: WorkflowContext[str]):
forward references. When provided, takes precedence over introspection from the
``WorkflowContext`` second generic parameter (W_OutT).

Warning:
When placing a custom ``@executor`` **between** two ``AgentExecutor`` nodes, be
careful about the output type. If the custom executor receives an
``AgentExecutorResponse`` but emits a plain ``str``, the downstream
``AgentExecutor.from_str`` handler is invoked instead of ``from_response``.
This resets the conversation context because only the new string is added to
the cache and all prior messages from the upstream agent are lost.

To preserve the full conversation, use
``AgentExecutorResponse.with_text(new_text)`` to create a new response that
keeps the prior history, and set ``output=AgentExecutorResponse`` on the
decorator.

Returns:
A FunctionExecutor instance that can be wired into a Workflow.

Expand Down
88 changes: 88 additions & 0 deletions python/packages/core/tests/workflow/test_full_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
WorkflowBuilder,
WorkflowContext,
WorkflowRunState,
executor,
handler,
)
from agent_framework.orchestrations import SequentialBuilder
Expand Down Expand Up @@ -478,3 +479,90 @@ async def test_from_response_preserves_service_session_id() -> None:
assert result.get_outputs() is not None

assert spy_agent._captured_service_session_id == "resp_PREVIOUS_RUN" # pyright: ignore[reportPrivateUsage]


@executor(
id="upper_case_executor",
input=AgentExecutorResponse,
output=AgentExecutorResponse,
workflow_output=str,
)
async def _upper_case_executor(
response: AgentExecutorResponse,
ctx: WorkflowContext[AgentExecutorResponse, str],
) -> None:
upper_text = response.agent_response.text.upper()
await ctx.send_message(response.with_text(upper_text))
await ctx.yield_output(upper_text)


async def test_with_text_preserves_full_conversation_through_custom_executor() -> None:
"""Custom executor using with_text must preserve the full conversation chain."""
# Mirrors the reproduction from issue #5246:
# agent1 ("User likes sky red") -> agent2 ("User likes sky blue") -> upper_case -> agent3 ("User likes sky green")
agent1 = AgentExecutor(
_SimpleAgent(id="agent1", name="ContextAgent1", reply_text="User likes sky red"), id="agent1"
)
agent2 = AgentExecutor(
_SimpleAgent(id="agent2", name="ContextAgent2", reply_text="User likes sky blue"), id="agent2"
)
agent3 = AgentExecutor(
_SimpleAgent(id="agent3", name="ContextAgent3", reply_text="User likes sky green"), id="agent3"
)
capturer = _CaptureFullConversation(id="capture")

wf = (
WorkflowBuilder(start_executor=agent1, output_executors=[capturer])
.add_chain([agent1, agent2, _upper_case_executor, agent3, capturer])
.build()
)

result = await wf.run("")
payload = next(o for o in result.get_outputs() if isinstance(o, dict))

# The final agent must see the full conversation: user, agent1, UPPER(agent2), agent3
assert payload["roles"] == ["user", "assistant", "assistant", "assistant"]
assert payload["texts"][1] == "User likes sky red"
assert payload["texts"][2] == "USER LIKES SKY BLUE"
assert payload["texts"][3] == "User likes sky green"


async def test_with_text_does_not_mutate_original() -> None:
"""with_text returns a new instance; the original must be unmodified."""
original = AgentExecutorResponse(
executor_id="test_exec",
agent_response=AgentResponse(messages=[Message("assistant", ["original reply"])]),
full_conversation=[Message("user", ["prompt"]), Message("assistant", ["original reply"])],
)

new = original.with_text("transformed reply")

assert new is not original
assert new.agent_response.text == "transformed reply"
assert new.full_conversation[-1].text == "transformed reply"
assert new.full_conversation[-1].role == "assistant"
# Original unchanged
assert original.agent_response.text == "original reply"
assert original.full_conversation[-1].text == "original reply"


async def test_with_text_strips_multi_message_agent_turn() -> None:
"""When the agent turn has multiple messages (tool calls), with_text strips all of them."""
tool_call = Message("assistant", ["<tool_call>"])
tool_result = Message("tool", ["<result>"])
final_reply = Message("assistant", ["actual answer"])
user_msg = Message("user", ["question"])

original = AgentExecutorResponse(
executor_id="exec",
agent_response=AgentResponse(messages=[tool_call, tool_result, final_reply]),
full_conversation=[user_msg, tool_call, tool_result, final_reply],
)

new = original.with_text("summarised answer")

# Only the pre-agent-turn messages should remain, plus the replacement
assert len(new.full_conversation) == 2
assert new.full_conversation[0].text == "question"
assert new.full_conversation[1].text == "summarised answer"
assert new.agent_response.text == "summarised answer"
Loading