44import time
55from typing import List
66
7+ import litellm
78from litellm import acompletion
89from typing import Dict
910
1516
1617logger = logging .getLogger (__name__ )
1718
19+ litellm ._turn_on_debug () # pyright: ignore[reportPrivateImportUsage]
20+
1821
1922class SingleTurnRolloutProcessor (RolloutProcessor ):
2023 """Single turn rollout processor for direct LLM calls."""
@@ -35,7 +38,6 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
3538 request_params = {"messages" : messages_payload , ** config .completion_params }
3639 # Ensure caching is disabled only for this request (review feedback)
3740 request_params ["cache" ] = {"no-cache" : True }
38- # request_params["timeout"] = 1200 # 20 minutes timeout
3941 request_params ["stream" ] = True # Enable streaming
4042 # Single-level reasoning effort: expect `reasoning_effort` only
4143 effort_val = None
@@ -64,30 +66,27 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
6466 if row .tools is not None :
6567 request_params ["tools" ] = row .tools
6668
67- # Dynamic import to avoid static dependency/lint errors if LiteLLM isn't installed yet
68- import importlib
69-
70- _litellm = importlib .import_module ("litellm" )
71- acompletion = getattr (_litellm , "acompletion" )
69+ # _litellm = importlib.import_module("litellm")
70+ # acompletion = getattr(_litellm, "acompletion")
7271
73- # Handle streaming response - following LiteLLM docs pattern
72+ # Handle streaming response
7473 assistant_content = ""
7574 tool_calls = None
76- chunks = []
77-
78- response = await acompletion (** request_params )
79-
80- # Process streaming chunks
81- async for chunk in response :
82- chunks .append (chunk ) # Collect chunks for potential use with stream_chunk_builder
75+ usage_info = None
8376
77+ stream = await acompletion (** request_params )
78+ async for chunk in stream : # pyright: ignore[reportGeneralTypeIssues]
8479 if chunk .choices and len (chunk .choices ) > 0 :
8580 delta = chunk .choices [0 ].delta
8681 if hasattr (delta , "content" ) and delta .content :
8782 assistant_content += delta .content
8883 if hasattr (delta , "tool_calls" ) and delta .tool_calls :
8984 tool_calls = delta .tool_calls
9085
86+ # Capture usage info from the final chunk
87+ if hasattr (chunk , "usage" ) and chunk .usage :
88+ usage_info = chunk .usage
89+
9190 converted_tool_calls = None
9291 if tool_calls :
9392 converted_tool_calls = []
@@ -127,26 +126,18 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
127126 )
128127 ]
129128
130- # Try to get usage info from chunks, fallback to estimates
131- usage_info = None
132- for chunk in reversed (chunks ): # Check last chunks first for usage info
133- if hasattr (chunk , "usage" ) and chunk .usage :
134- usage_info = chunk .usage
135- break
136-
137129 if usage_info :
138130 row .execution_metadata .usage = CompletionUsage (
139131 prompt_tokens = usage_info .prompt_tokens ,
140132 completion_tokens = usage_info .completion_tokens ,
141133 total_tokens = usage_info .total_tokens ,
142134 )
143135 else :
144- # Fallback estimates when streaming doesn't provide usage
145- estimated_completion_tokens = len (assistant_content .split ()) if assistant_content else 0
136+ # Fallback if usage info not available from streaming
146137 row .execution_metadata .usage = CompletionUsage (
147138 prompt_tokens = 0 ,
148- completion_tokens = estimated_completion_tokens ,
149- total_tokens = estimated_completion_tokens ,
139+ completion_tokens = 0 ,
140+ total_tokens = 0 ,
150141 )
151142
152143 row .messages = messages
0 commit comments