diff --git a/src/strands/event_loop/event_loop.py b/src/strands/event_loop/event_loop.py index 186ead708..3d13490ab 100644 --- a/src/strands/event_loop/event_loop.py +++ b/src/strands/event_loop/event_loop.py @@ -140,108 +140,98 @@ async def event_loop_cycle( ) invocation_state["event_loop_cycle_span"] = cycle_span - # Skipping model invocation if in interrupt state as interrupts are currently only supported for tool calls. - if agent._interrupt_state.activated: - stop_reason: StopReason = "tool_use" - message = agent._interrupt_state.context["tool_use_message"] - # Skip model invocation if the latest message contains ToolUse - elif _has_tool_use_in_latest_message(agent.messages): - stop_reason = "tool_use" - message = agent.messages[-1] - else: - model_events = _handle_model_execution( - agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context - ) - async for model_event in model_events: - if not isinstance(model_event, ModelStopReason): - yield model_event - - stop_reason, message, *_ = model_event["stop"] - yield ModelMessageEvent(message=message) + with trace_api.use_span(cycle_span, end_on_exit=True): + # Skipping model invocation if in interrupt state as interrupts are currently only supported for tool calls. + if agent._interrupt_state.activated: + stop_reason: StopReason = "tool_use" + message = agent._interrupt_state.context["tool_use_message"] + # Skip model invocation if the latest message contains ToolUse + elif _has_tool_use_in_latest_message(agent.messages): + stop_reason = "tool_use" + message = agent.messages[-1] + else: + model_events = _handle_model_execution( + agent, cycle_span, cycle_trace, invocation_state, tracer, structured_output_context + ) + async for model_event in model_events: + if not isinstance(model_event, ModelStopReason): + yield model_event + + stop_reason, message, *_ = model_event["stop"] + yield ModelMessageEvent(message=message) + + try: + if stop_reason == "max_tokens": + """ + Handle max_tokens limit reached by the model. + + When the model reaches its maximum token limit, this represents a potentially unrecoverable + state where the model's response was truncated. By default, Strands fails hard with an + MaxTokensReachedException to maintain consistency with other failure types. + """ + raise MaxTokensReachedException( + message=( + "Agent has reached an unrecoverable state due to max_tokens limit. " + "For more information see: " + "https://strandsagents.com/latest/user-guide/concepts/agents/agent-loop/#maxtokensreachedexception" + ) + ) - try: - if stop_reason == "max_tokens": - """ - Handle max_tokens limit reached by the model. - - When the model reaches its maximum token limit, this represents a potentially unrecoverable - state where the model's response was truncated. By default, Strands fails hard with an - MaxTokensReachedException to maintain consistency with other failure types. - """ - raise MaxTokensReachedException( - message=( - "Agent has reached an unrecoverable state due to max_tokens limit. " - "For more information see: " - "https://strandsagents.com/latest/user-guide/concepts/agents/agent-loop/#maxtokensreachedexception" + if stop_reason == "tool_use": + # Handle tool execution + tool_events = _handle_tool_execution( + stop_reason, + message, + agent=agent, + cycle_trace=cycle_trace, + cycle_span=cycle_span, + cycle_start_time=cycle_start_time, + invocation_state=invocation_state, + tracer=tracer, + structured_output_context=structured_output_context, ) + async for tool_event in tool_events: + yield tool_event + + return + + # End the cycle and return results + agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace, attributes) + # Set attributes before span auto-closes + tracer.end_event_loop_cycle_span(cycle_span, message) + except EventLoopException: + # Don't yield or log the exception - we already did it when we + # raised the exception and we don't need that duplication. + raise + except (ContextWindowOverflowException, MaxTokensReachedException) as e: + # Special cased exceptions which we want to bubble up rather than get wrapped in an EventLoopException + raise e + except Exception as e: + # Handle any other exceptions + yield ForceStopEvent(reason=e) + logger.exception("cycle failed") + raise EventLoopException(e, invocation_state["request_state"]) from e + + # Force structured output tool call if LLM didn't use it automatically + if structured_output_context.is_enabled and stop_reason == "end_turn": + if structured_output_context.force_attempted: + raise StructuredOutputException( + "The model failed to invoke the structured output tool even after it was forced." + ) + structured_output_context.set_forced_mode() + logger.debug("Forcing structured output tool") + await agent._append_message( + {"role": "user", "content": [{"text": "You must format the previous response as structured output."}]} ) - if stop_reason == "tool_use": - # Handle tool execution - tool_events = _handle_tool_execution( - stop_reason, - message, - agent=agent, - cycle_trace=cycle_trace, - cycle_span=cycle_span, - cycle_start_time=cycle_start_time, - invocation_state=invocation_state, - tracer=tracer, - structured_output_context=structured_output_context, + events = recurse_event_loop( + agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context ) - async for tool_event in tool_events: - yield tool_event - + async for typed_event in events: + yield typed_event return - # End the cycle and return results - agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace, attributes) - if cycle_span: - tracer.end_event_loop_cycle_span( - span=cycle_span, - message=message, - ) - except EventLoopException as e: - if cycle_span: - tracer.end_span_with_error(cycle_span, str(e), e) - - # Don't yield or log the exception - we already did it when we - # raised the exception and we don't need that duplication. - raise - except (ContextWindowOverflowException, MaxTokensReachedException) as e: - # Special cased exceptions which we want to bubble up rather than get wrapped in an EventLoopException - if cycle_span: - tracer.end_span_with_error(cycle_span, str(e), e) - raise e - except Exception as e: - if cycle_span: - tracer.end_span_with_error(cycle_span, str(e), e) - - # Handle any other exceptions - yield ForceStopEvent(reason=e) - logger.exception("cycle failed") - raise EventLoopException(e, invocation_state["request_state"]) from e - - # Force structured output tool call if LLM didn't use it automatically - if structured_output_context.is_enabled and stop_reason == "end_turn": - if structured_output_context.force_attempted: - raise StructuredOutputException( - "The model failed to invoke the structured output tool even after it was forced." - ) - structured_output_context.set_forced_mode() - logger.debug("Forcing structured output tool") - await agent._append_message( - {"role": "user", "content": [{"text": "You must format the previous response as structured output."}]} - ) - - events = recurse_event_loop( - agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context - ) - async for typed_event in events: - yield typed_event - return - - yield EventLoopStopEvent(stop_reason, message, agent.event_loop_metrics, invocation_state["request_state"]) + yield EventLoopStopEvent(stop_reason, message, agent.event_loop_metrics, invocation_state["request_state"]) async def recurse_event_loop( @@ -325,7 +315,7 @@ async def _handle_model_execution( model_id=model_id, custom_trace_attributes=agent.trace_attributes, ) - with trace_api.use_span(model_invoke_span): + with trace_api.use_span(model_invoke_span, end_on_exit=True): await agent.hooks.invoke_callbacks_async( BeforeModelCallEvent( agent=agent, @@ -364,14 +354,12 @@ async def _handle_model_execution( if stop_reason == "max_tokens": message = recover_message_on_max_tokens_reached(message) - if model_invoke_span: - tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, stop_reason) + # Set attributes before span auto-closes + tracer.end_model_invoke_span(model_invoke_span, message, usage, metrics, stop_reason) break # Success! Break out of retry loop except Exception as e: - if model_invoke_span: - tracer.end_span_with_error(model_invoke_span, str(e), e) - + # Exception is automatically recorded by use_span with end_on_exit=True await agent.hooks.invoke_callbacks_async( AfterModelCallEvent( agent=agent, @@ -413,9 +401,6 @@ async def _handle_model_execution( agent.event_loop_metrics.update_metrics(metrics) except Exception as e: - if cycle_span: - tracer.end_span_with_error(cycle_span, str(e), e) - yield ForceStopEvent(reason=e) logger.exception("cycle failed") raise EventLoopException(e, invocation_state["request_state"]) from e @@ -499,6 +484,7 @@ async def _handle_tool_execution( interrupts, structured_output=structured_output_result, ) + # Set attributes before span auto-closes (span is managed by use_span in event_loop_cycle) if cycle_span: tracer.end_event_loop_cycle_span(span=cycle_span, message=message) @@ -516,6 +502,7 @@ async def _handle_tool_execution( yield ToolResultMessageEvent(message=tool_result_message) + # Set attributes before span auto-closes (span is managed by use_span in event_loop_cycle) if cycle_span: tracer.end_event_loop_cycle_span(span=cycle_span, message=message, tool_result_message=tool_result_message) diff --git a/src/strands/telemetry/tracer.py b/src/strands/telemetry/tracer.py index 2f42d9988..3cbe3966c 100644 --- a/src/strands/telemetry/tracer.py +++ b/src/strands/telemetry/tracer.py @@ -319,14 +319,20 @@ def end_model_invoke_span( ) -> None: """End a model invocation span with results and metrics. + Note: When using with trace_api.use_span(end_on_exit=True), the span will be automatically + closed and exceptions recorded. This method just sets the necessary attributes. + Args: - span: The span to end. + span: The span to set attributes on. message: The message response from the model. usage: Token usage information from the model call. metrics: Metrics from the model call. - stop_reason (StopReason): The reason the model stopped generating. - error: Optional exception if the model call failed. + stop_reason: The reason the model stopped generating. + error: Optional exception if the model call failed (not used when end_on_exit=True). """ + # Set end time attribute + span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat()) + attributes: Dict[str, AttributeValue] = { "gen_ai.usage.prompt_tokens": usage["inputTokens"], "gen_ai.usage.input_tokens": usage["inputTokens"], @@ -361,7 +367,12 @@ def end_model_invoke_span( event_attributes={"finish_reason": str(stop_reason), "message": serialize(message["content"])}, ) - self._end_span(span, attributes, error) + self._set_attributes(span, attributes) + + # Note: self._end_span() is commented out because when using trace_api.use_span(end_on_exit=True), + # the span is automatically closed and exceptions are automatically recorded by OpenTelemetry. + # Status is also automatically set to UNSET (OK) on success or ERROR on exception. + # self._end_span(span, attributes, error) def start_tool_call_span( self, @@ -493,7 +504,7 @@ def start_event_loop_cycle_span( parent_span: Optional[Span] = None, custom_trace_attributes: Optional[Mapping[str, AttributeValue]] = None, **kwargs: Any, - ) -> Optional[Span]: + ) -> Span: """Start a new span for an event loop cycle. Args: @@ -537,13 +548,21 @@ def end_event_loop_cycle_span( ) -> None: """End an event loop cycle span with results. + Note: When using with trace_api.use_span(end_on_exit=True), the span will be automatically + closed and exceptions recorded. This method just sets the necessary attributes. + Args: - span: The span to end. + span: The span to set attributes on. message: The message response from this cycle. tool_result_message: Optional tool result message if a tool was called. - error: Optional exception if the cycle failed. + error: Optional exception if the cycle failed (not used when end_on_exit=True). """ - attributes: Dict[str, AttributeValue] = {} + if not span: + return + + # Set end time attribute + span.set_attribute("gen_ai.event.end_time", datetime.now(timezone.utc).isoformat()) + event_attributes: Dict[str, AttributeValue] = {"message": serialize(message["content"])} if tool_result_message: @@ -566,7 +585,11 @@ def end_event_loop_cycle_span( ) else: self._add_event(span, "gen_ai.choice", event_attributes=event_attributes) - self._end_span(span, attributes, error) + + # Note: self._end_span() is commented out because when using trace_api.use_span(end_on_exit=True), + # the span is automatically closed and exceptions are automatically recorded by OpenTelemetry. + # Status is also automatically set to UNSET (OK) on success or ERROR on exception. + # self._end_span(span, attributes, error) def start_agent_span( self, diff --git a/tests/strands/event_loop/test_event_loop.py b/tests/strands/event_loop/test_event_loop.py index 52980729c..14d6eb8a6 100644 --- a/tests/strands/event_loop/test_event_loop.py +++ b/tests/strands/event_loop/test_event_loop.py @@ -569,9 +569,6 @@ async def test_event_loop_tracing_with_model_error( ) await alist(stream) - # Verify error handling span methods were called - mock_tracer.end_span_with_error.assert_called_once_with(model_span, "Input too long", model.stream.side_effect) - @pytest.mark.asyncio async def test_event_loop_cycle_max_tokens_exception( @@ -698,8 +695,6 @@ async def test_event_loop_tracing_with_throttling_exception( ) await alist(stream) - # Verify error span was created for the throttling exception - assert mock_tracer.end_span_with_error.call_count == 1 # Verify span was created for the successful retry assert mock_tracer.start_model_invoke_span.call_count == 2 assert mock_tracer.end_model_invoke_span.call_count == 1 diff --git a/tests/strands/telemetry/test_tracer.py b/tests/strands/telemetry/test_tracer.py index 205748956..80b098cc9 100644 --- a/tests/strands/telemetry/test_tracer.py +++ b/tests/strands/telemetry/test_tracer.py @@ -246,8 +246,6 @@ def test_end_model_invoke_span(mock_span): "gen_ai.choice", attributes={"message": json.dumps(message["content"]), "finish_reason": "end_turn"}, ) - mock_span.set_status.assert_called_once_with(StatusCode.OK) - mock_span.end.assert_called_once() def test_end_model_invoke_span_latest_conventions(mock_span, monkeypatch): @@ -284,9 +282,6 @@ def test_end_model_invoke_span_latest_conventions(mock_span, monkeypatch): }, ) - mock_span.set_status.assert_called_once_with(StatusCode.OK) - mock_span.end.assert_called_once() - def test_start_tool_call_span(mock_tracer): """Test starting a tool call span.""" @@ -650,8 +645,6 @@ def test_end_event_loop_cycle_span(mock_span): "tool.result": json.dumps(tool_result_message["content"]), }, ) - mock_span.set_status.assert_called_once_with(StatusCode.OK) - mock_span.end.assert_called_once() def test_end_event_loop_cycle_span_latest_conventions(mock_span, monkeypatch): @@ -687,8 +680,6 @@ def test_end_event_loop_cycle_span_latest_conventions(mock_span, monkeypatch): ) }, ) - mock_span.set_status.assert_called_once_with(StatusCode.OK) - mock_span.end.assert_called_once() def test_start_agent_span(mock_tracer): @@ -860,8 +851,6 @@ def test_end_model_invoke_span_with_cache_metrics(mock_span): mock_span.set_attribute.assert_any_call("gen_ai.usage.cache_write_input_tokens", 3) mock_span.set_attribute.assert_any_call("gen_ai.server.request.duration", 10) mock_span.set_attribute.assert_any_call("gen_ai.server.time_to_first_token", 5) - mock_span.set_status.assert_called_once_with(StatusCode.OK) - mock_span.end.assert_called_once() def test_end_agent_span_with_cache_metrics(mock_span):