Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 31 additions & 86 deletions test/components/agents/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from haystack.components.builders.chat_prompt_builder import ChatPromptBuilder
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.generators.chat.openai import OpenAIChatGenerator
from haystack.components.joiners.list_joiner import ListJoiner
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.core.component.types import OutputSocket
from haystack.dataclasses import ChatMessage, ToolCall
Expand Down Expand Up @@ -1629,93 +1630,40 @@ def test_rag_pipeline_messages_plus_user_prompt(self, document_store_with_docs,
assert "Relevant docs:" in rendered


@pytest.mark.integration
class TestAgentPipelineStaticToolInput:
"""
Regression test for the scheduling bug introduced by making the 'messages'
run parameter non-required in https://github.com/deepset-ai/haystack/pull/10638.

pipeline inputs:
query → history_parser # feeds the messages chain
filters → agent.retrieval_filters # static, sender=None ← the trigger
(files is optional / absent)
Pipeline shape
--------------
Two paths feed into a lazy-variadic joiner that collects messages for the Agent:

pipeline connections:
history_parser.messages → messages_joiner.values
files_processor.prompt → messages_joiner.values # needs 'files' (mandatory)
messages_joiner.values → system_concat.messages
system_concat.output → agent.messages
Path A (works): query → history_parser → messages_joiner.values
Path B (blocked): files=[] → files_processor (returns {}) → attachments_builder ──╳──→ messages_joiner.values

agent.tools = [ComponentTool(inputs_from_state={"documents": "docs"})]
messages_joiner.values → agent.messages
filters → agent.retrieval_filters (static input from pipeline.run data)

The bug
-------
When the optional 'files' pipeline input is NOT provided:
1. files_processor is BLOCKED (its mandatory 'files' input is absent).
2. messages_joiner stays DEFER_LAST.
3. system_concat is BLOCKED – cannot receive 'messages'.
4. agent.messages is therefore never delivered.

Meanwhile, 'filters' → agent.retrieval_filters (sender=None) fires the pipeline's
"user trigger" gate on the Agent's first visit. Because none of the Agent's
sockets are mandatory, can_component_run() returns True and the Agent gets
DEFER priority instead of BLOCKED.

The scheduler eventually pops the Agent (DEFER) from the queue — the only
non-BLOCKED component left — and runs it. _add_missing_input_defaults fills
messages=None, and Agent._initialize_fresh_execution raises:
1. history_parser runs → sends messages to messages_joiner.
2. files_processor runs with files=[] → returns {} (no output).
3. attachments_builder is BLOCKED — its mandatory processed_files input never arrives.
4. messages_joiner gets DEFER_LAST (priority=4): it has a lazy-variadic socket and attachments_builder hasn't
executed yet, so the joiner doesn't know if more data might still come. It keeps waiting.
5. agent gets DEFER (priority=3): retrieval_filters arrives with sender=None (static pipeline input), which
satisfies has_any_trigger() on the first visit. The Agent has no mandatory sockets, so can_component_run()
returns True. It also has no unresolved lazy-variadic sockets, so it gets DEFER rather than DEFER_LAST.
6. Since DEFER (3) < DEFER_LAST (4), the scheduler picks the Agent before the joiner runs.
The Agent executes without messages and raises:

ValueError("No messages provided to the Agent and neither
user_prompt nor system_prompt is set.")
"""

@pytest.fixture()
def search_tool(self):
return ComponentTool(
name="search",
description="Searches documents.",
component=PromptBuilder(template="{% for d in docs %}{{ d.content }}{% endfor %}"),
inputs_from_state={"documents": "docs"},
)

def _make_agent(self, search_tool):
chat_generator = MockChatGenerator()
agent = Agent(
chat_generator=chat_generator,
tools=[search_tool],
state_schema={"retrieval_filters": {"type": dict[str, Any]}, "documents": {"type": list[Document]}},
)
# Mock after __init__ so Agent sees the real 'tools' param in the signature.
chat_generator.run = MagicMock(return_value={"replies": [ChatMessage.from_assistant("done")]})
return agent

def test_agent_runs_prematurely_when_messages_predecessor_is_blocked(self, search_tool):
"""
Demonstrates the bug: the Agent executes without 'messages' when its
messages-providing predecessor chain is permanently BLOCKED.

Pipeline shape:
query → history_parser → messages_joiner.values
files=[]→ files_processor → attachments_builder → messages_joiner.values
messages_joiner → system_concat → agent.messages
filters → agent.retrieval_filters (static, triggers the user gate)

Scheduling sequence that exposes the bug:
1. history_parser runs (query provided) → sends to messages_joiner.
2. files_processor runs with files=[] → returns {} (_NO_OUTPUT_PRODUCED).
3. attachments_builder receives _NO_OUTPUT_PRODUCED → BLOCKED (mandatory
processed_files socket never filled).
4. messages_joiner is DEFER_LAST (lazy-variadic; attachments_builder
has not executed yet → are_all_lazy_variadic_sockets_resolved=False).
5. system_concat is BLOCKED (mandatory messages from messages_joiner
never received).
6. agent is DEFER (static retrieval_filters triggered the user gate;
no mandatory sockets → can_component_run=True).

DEFER (priority=3) < DEFER_LAST (priority=4) → the scheduler picks the
Agent before messages_joiner gets a chance to run. _add_missing_input_defaults
fills messages=None, and Agent._initialize_fresh_execution raises:
ValueError("No messages provided …")
"""
def test_agent_waits_for_messages_when_predecessor_is_blocked(self, weather_tool):

@component
class HistoryParser:
Expand All @@ -1741,37 +1689,34 @@ class AttachmentsBuilder:
def run(self, processed_files: list[str]) -> dict:
return {"prompt": [ChatMessage.from_user(f"Files: {processed_files}")]}

@component
class SystemConcat:
@component.output_types(output=list[ChatMessage])
def run(self, messages: list[ChatMessage]) -> dict:
return {"output": messages}

from haystack.components.joiners.list_joiner import ListJoiner

agent = self._make_agent(search_tool)
chat_generator = MockChatGenerator()
agent = Agent(
chat_generator=chat_generator,
tools=[weather_tool],
state_schema={"retrieval_filters": {"type": dict[str, Any]}},
)
chat_generator.run = MagicMock(return_value={"replies": [ChatMessage.from_assistant("done")]})

pipeline = Pipeline()
pipeline.add_component("history_parser", HistoryParser())
pipeline.add_component("files_processor", FilesProcessor())
pipeline.add_component("attachments_builder", AttachmentsBuilder())
pipeline.add_component("messages_joiner", ListJoiner(list[ChatMessage]))
pipeline.add_component("system_concat", SystemConcat())
pipeline.add_component("agent", agent)

pipeline.connect("history_parser.messages", "messages_joiner.values")
pipeline.connect("files_processor.processed_files", "attachments_builder.processed_files")
pipeline.connect("attachments_builder.prompt", "messages_joiner.values")
pipeline.connect("messages_joiner.values", "system_concat.messages")
pipeline.connect("system_concat.output", "agent.messages")
pipeline.connect("messages_joiner.values", "agent.messages")

# files=[] → files_processor produces no output → attachments_builder BLOCKED
# → messages_joiner stays DEFER_LAST → system_concat BLOCKED
# → messages_joiner stays DEFER_LAST
# → agent (DEFER) runs first without messages → ValueError
pipeline.run(
result = pipeline.run(
data={
"history_parser": {"query": "What case law applies?"},
"files_processor": {"files": []}, # empty → no output
"agent": {"retrieval_filters": {"field": "date", "value": "2024-01-01"}},
}
)
assert "agent" in result
Loading