Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 91 additions & 104 deletions src/strands/event_loop/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,108 +140,98 @@ async def event_loop_cycle(
)
invocation_state["event_loop_cycle_span"] = cycle_span

# Skipping model invocation if in interrupt state as interrupts are currently only supported for tool calls.
if agent._interrupt_state.activated:
stop_reason: StopReason = "tool_use"
message = agent._interrupt_state.context["tool_use_message"]
# Skip model invocation if the latest message contains ToolUse
elif _has_tool_use_in_latest_message(agent.messages):
stop_reason = "tool_use"
message = agent.messages[-1]
else:
model_events = _handle_model_execution(
agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context
)
async for model_event in model_events:
if not isinstance(model_event, ModelStopReason):
yield model_event

stop_reason, message, *_ = model_event["stop"]
yield ModelMessageEvent(message=message)
with trace_api.use_span(cycle_span, end_on_exit=True):
# Skipping model invocation if in interrupt state as interrupts are currently only supported for tool calls.
if agent._interrupt_state.activated:
stop_reason: StopReason = "tool_use"
message = agent._interrupt_state.context["tool_use_message"]
# Skip model invocation if the latest message contains ToolUse
elif _has_tool_use_in_latest_message(agent.messages):
stop_reason = "tool_use"
message = agent.messages[-1]
else:
model_events = _handle_model_execution(
agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context
)
async for model_event in model_events:
if not isinstance(model_event, ModelStopReason):
yield model_event

stop_reason, message, *_ = model_event["stop"]
yield ModelMessageEvent(message=message)

try:
if stop_reason == "max_tokens":
"""
Handle max_tokens limit reached by the model.

When the model reaches its maximum token limit, this represents a potentially unrecoverable
state where the model's response was truncated. By default, Strands fails hard with an
MaxTokensReachedException to maintain consistency with other failure types.
"""
raise MaxTokensReachedException(
message=(
"Agent has reached an unrecoverable state due to max_tokens limit. "
"For more information see: "
"https://strandsagents.com/latest/user-guide/concepts/agents/agent-loop/#maxtokensreachedexception"
)
)

try:
if stop_reason == "max_tokens":
"""
Handle max_tokens limit reached by the model.

When the model reaches its maximum token limit, this represents a potentially unrecoverable
state where the model's response was truncated. By default, Strands fails hard with an
MaxTokensReachedException to maintain consistency with other failure types.
"""
raise MaxTokensReachedException(
message=(
"Agent has reached an unrecoverable state due to max_tokens limit. "
"For more information see: "
"https://strandsagents.com/latest/user-guide/concepts/agents/agent-loop/#maxtokensreachedexception"
if stop_reason == "tool_use":
# Handle tool execution
tool_events = _handle_tool_execution(
stop_reason,
message,
agent=agent,
cycle_trace=cycle_trace,
cycle_span=cycle_span,
cycle_start_time=cycle_start_time,
invocation_state=invocation_state,
tracer=tracer,
structured_output_context=structured_output_context,
)
async for tool_event in tool_events:
yield tool_event

return

# End the cycle and return results
agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace, attributes)
# Set attributes before span auto-closes
tracer.end_event_loop_cycle_span(cycle_span, message)
except EventLoopException:
# Don't yield or log the exception - we already did it when we
# raised the exception and we don't need that duplication.
raise
except (ContextWindowOverflowException, MaxTokensReachedException) as e:
# Special cased exceptions which we want to bubble up rather than get wrapped in an EventLoopException
raise e
except Exception as e:
# Handle any other exceptions
yield ForceStopEvent(reason=e)
logger.exception("cycle failed")
raise EventLoopException(e, invocation_state["request_state"]) from e

# Force structured output tool call if LLM didn't use it automatically
if structured_output_context.is_enabled and stop_reason == "end_turn":
if structured_output_context.force_attempted:
raise StructuredOutputException(
"The model failed to invoke the structured output tool even after it was forced."
)
structured_output_context.set_forced_mode()
logger.debug("Forcing structured output tool")
await agent._append_message(
{"role": "user", "content": [{"text": "You must format the previous response as structured output."}]}
)

if stop_reason == "tool_use":
# Handle tool execution
tool_events = _handle_tool_execution(
stop_reason,
message,
agent=agent,
cycle_trace=cycle_trace,
cycle_span=cycle_span,
cycle_start_time=cycle_start_time,
invocation_state=invocation_state,
tracer=tracer,
structured_output_context=structured_output_context,
events = recurse_event_loop(
agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
)
async for tool_event in tool_events:
yield tool_event

async for typed_event in events:
yield typed_event
return

# End the cycle and return results
agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace, attributes)
if cycle_span:
tracer.end_event_loop_cycle_span(
span=cycle_span,
message=message,
)
except EventLoopException as e:
if cycle_span:
tracer.end_span_with_error(cycle_span, str(e), e)

# Don't yield or log the exception - we already did it when we
# raised the exception and we don't need that duplication.
raise
except (ContextWindowOverflowException, MaxTokensReachedException) as e:
# Special cased exceptions which we want to bubble up rather than get wrapped in an EventLoopException
if cycle_span:
tracer.end_span_with_error(cycle_span, str(e), e)
raise e
except Exception as e:
if cycle_span:
tracer.end_span_with_error(cycle_span, str(e), e)

# Handle any other exceptions
yield ForceStopEvent(reason=e)
logger.exception("cycle failed")
raise EventLoopException(e, invocation_state["request_state"]) from e

# Force structured output tool call if LLM didn't use it automatically
if structured_output_context.is_enabled and stop_reason == "end_turn":
if structured_output_context.force_attempted:
raise StructuredOutputException(
"The model failed to invoke the structured output tool even after it was forced."
)
structured_output_context.set_forced_mode()
logger.debug("Forcing structured output tool")
await agent._append_message(
{"role": "user", "content": [{"text": "You must format the previous response as structured output."}]}
)

events = recurse_event_loop(
agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
)
async for typed_event in events:
yield typed_event
return

yield EventLoopStopEvent(stop_reason, message, agent.event_loop_metrics, invocation_state["request_state"])
yield EventLoopStopEvent(stop_reason, message, agent.event_loop_metrics, invocation_state["request_state"])


async def recurse_event_loop(
Expand Down Expand Up @@ -325,7 +315,7 @@ async def _handle_model_execution(
model_id=model_id,
custom_trace_attributes=agent.trace_attributes,
)
with trace_api.use_span(model_invoke_span):
with trace_api.use_span(model_invoke_span, end_on_exit=True):
await agent.hooks.invoke_callbacks_async(
BeforeModelCallEvent(
agent=agent,
Expand Down Expand Up @@ -364,14 +354,12 @@ async def _handle_model_execution(
if stop_reason == "max_tokens":
message = recover_message_on_max_tokens_reached(message)

if model_invoke_span:
tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, stop_reason)
# Set attributes before span auto-closes
tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, stop_reason)
break # Success! Break out of retry loop

except Exception as e:
if model_invoke_span:
tracer.end_span_with_error(model_invoke_span, str(e), e)

# Exception is automatically recorded by use_span with end_on_exit=True
await agent.hooks.invoke_callbacks_async(
AfterModelCallEvent(
agent=agent,
Expand Down Expand Up @@ -413,9 +401,6 @@ async def _handle_model_execution(
agent.event_loop_metrics.update_metrics(metrics)

except Exception as e:
if cycle_span:
tracer.end_span_with_error(cycle_span, str(e), e)

yield ForceStopEvent(reason=e)
logger.exception("cycle failed")
raise EventLoopException(e, invocation_state["request_state"]) from e
Expand Down Expand Up @@ -499,6 +484,7 @@ async def _handle_tool_execution(
interrupts,
structured_output=structured_output_result,
)
# Set attributes before span auto-closes (span is managed by use_span in event_loop_cycle)
if cycle_span:
tracer.end_event_loop_cycle_span(span=cycle_span, message=message)

Expand All @@ -516,6 +502,7 @@ async def _handle_tool_execution(

yield ToolResultMessageEvent(message=tool_result_message)

# Set attributes before span auto-closes (span is managed by use_span in event_loop_cycle)
if cycle_span:
tracer.end_event_loop_cycle_span(span=cycle_span, message=message, tool_result_message=tool_result_message)

Expand Down
41 changes: 32 additions & 9 deletions src/strands/telemetry/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,20 @@ def end_model_invoke_span(
) -> None:
"""End a model invocation span with results and metrics.

Note: When using with trace_api.use_span(end_on_exit=True), the span will be automatically
closed and exceptions recorded. This method just sets the necessary attributes.

Args:
span: The span to end.
span: The span to set attributes on.
message: The message response from the model.
usage: Token usage information from the model call.
metrics: Metrics from the model call.
stop_reason (StopReason): The reason the model stopped generating.
error: Optional exception if the model call failed.
stop_reason: The reason the model stopped generating.
error: Optional exception if the model call failed (not used when end_on_exit=True).
"""
# Set end time attribute
span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())

attributes: Dict[str, AttributeValue] = {
"gen_ai.usage.prompt_tokens": usage["inputTokens"],
"gen_ai.usage.input_tokens": usage["inputTokens"],
Expand Down Expand Up @@ -361,7 +367,12 @@ def end_model_invoke_span(
event_attributes={"finish_reason": str(stop_reason), "message": serialize(message["content"])},
)

self._end_span(span, attributes, error)
self._set_attributes(span, attributes)

# Note: self._end_span() is commented out because when using trace_api.use_span(end_on_exit=True),
# the span is automatically closed and exceptions are automatically recorded by OpenTelemetry.
# Status is also automatically set to UNSET (OK) on success or ERROR on exception.
# self._end_span(span, attributes, error)

def start_tool_call_span(
self,
Expand Down Expand Up @@ -493,7 +504,7 @@ def start_event_loop_cycle_span(
parent_span: Optional[Span] = None,
custom_trace_attributes: Optional[Mapping[str, AttributeValue]] = None,
**kwargs: Any,
) -> Optional[Span]:
) -> Span:
"""Start a new span for an event loop cycle.

Args:
Expand Down Expand Up @@ -537,13 +548,21 @@ def end_event_loop_cycle_span(
) -> None:
"""End an event loop cycle span with results.

Note: When using with trace_api.use_span(end_on_exit=True), the span will be automatically
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're forcing callers to close this now, right? If so, document that instead of When using with trace_api.use_span(end_on_exit=True), as that implies they can do this, but we're saying You must do this or close it explicitly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OTEL will end the span automatically when exiting the function (regardless of success or exception being thrown). Previously we ended the span and recorded the exception by ourselves in our tracer setup in the tracer.end_xxx method. This is not ideal as OTEL can record the exception by itself.

closed and exceptions recorded. This method just sets the necessary attributes.

Args:
span: The span to end.
span: The span to set attributes on.
message: The message response from this cycle.
tool_result_message: Optional tool result message if a tool was called.
error: Optional exception if the cycle failed.
error: Optional exception if the cycle failed (not used when end_on_exit=True).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is end_on_exit not True?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll remove this! It should be always True.

"""
attributes: Dict[str, AttributeValue] = {}
if not span:
return

# Set end time attribute
span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat())

event_attributes: Dict[str, AttributeValue] = {"message": serialize(message["content"])}

if tool_result_message:
Expand All @@ -566,7 +585,11 @@ def end_event_loop_cycle_span(
)
else:
self._add_event(span, "gen_ai.choice", event_attributes=event_attributes)
self._end_span(span, attributes, error)

# Note: self._end_span() is commented out because when using trace_api.use_span(end_on_exit=True),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callers have no control over this; we can leave an explaining comment saying "We don't use self._end_span() because callers are responsible via closing"

But don't leave this commented out, just remove it

# the span is automatically closed and exceptions are automatically recorded by OpenTelemetry.
# Status is also automatically set to UNSET (OK) on success or ERROR on exception.
# self._end_span(span, attributes, error)

def start_agent_span(
self,
Expand Down
5 changes: 0 additions & 5 deletions tests/strands/event_loop/test_event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,6 @@ async def test_event_loop_tracing_with_model_error(
)
await alist(stream)

# Verify error handling span methods were called
mock_tracer.end_span_with_error.assert_called_once_with(model_span, "Input too long", model.stream.side_effect)


@pytest.mark.asyncio
async def test_event_loop_cycle_max_tokens_exception(
Expand Down Expand Up @@ -698,8 +695,6 @@ async def test_event_loop_tracing_with_throttling_exception(
)
await alist(stream)

# Verify error span was created for the throttling exception
assert mock_tracer.end_span_with_error.call_count == 1
# Verify span was created for the successful retry
assert mock_tracer.start_model_invoke_span.call_count == 2
assert mock_tracer.end_model_invoke_span.call_count == 1
Expand Down
11 changes: 0 additions & 11 deletions tests/strands/telemetry/test_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,6 @@ def test_end_model_invoke_span(mock_span):
"gen_ai.choice",
attributes={"message": json.dumps(message["content"]), "finish_reason": "end_turn"},
)
mock_span.set_status.assert_called_once_with(StatusCode.OK)
mock_span.end.assert_called_once()


def test_end_model_invoke_span_latest_conventions(mock_span, monkeypatch):
Expand Down Expand Up @@ -284,9 +282,6 @@ def test_end_model_invoke_span_latest_conventions(mock_span, monkeypatch):
},
)

mock_span.set_status.assert_called_once_with(StatusCode.OK)
mock_span.end.assert_called_once()


def test_start_tool_call_span(mock_tracer):
"""Test starting a tool call span."""
Expand Down Expand Up @@ -650,8 +645,6 @@ def test_end_event_loop_cycle_span(mock_span):
"tool.result": json.dumps(tool_result_message["content"]),
},
)
mock_span.set_status.assert_called_once_with(StatusCode.OK)
mock_span.end.assert_called_once()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be asserting that it was closed correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not closed in our tracer anymore, but I can add validation in other places.



def test_end_event_loop_cycle_span_latest_conventions(mock_span, monkeypatch):
Expand Down Expand Up @@ -687,8 +680,6 @@ def test_end_event_loop_cycle_span_latest_conventions(mock_span, monkeypatch):
)
},
)
mock_span.set_status.assert_called_once_with(StatusCode.OK)
mock_span.end.assert_called_once()


def test_start_agent_span(mock_tracer):
Expand Down Expand Up @@ -860,8 +851,6 @@ def test_end_model_invoke_span_with_cache_metrics(mock_span):
mock_span.set_attribute.assert_any_call("gen_ai.usage.cache_write_input_tokens", 3)
mock_span.set_attribute.assert_any_call("gen_ai.server.request.duration", 10)
mock_span.set_attribute.assert_any_call("gen_ai.server.time_to_first_token", 5)
mock_span.set_status.assert_called_once_with(StatusCode.OK)
mock_span.end.assert_called_once()


def test_end_agent_span_with_cache_metrics(mock_span):
Expand Down
Loading