Skip to content
80 changes: 77 additions & 3 deletions src/praisonai-agents/praisonaiagents/agent/chat_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ def _process_stream_response(self, messages, temperature, start_time, formatted_
emit_events=True
)

def _chat_completion(self, messages, temperature=1.0, tools=None, stream=True, reasoning_steps=False, task_name=None, task_description=None, task_id=None, response_format=None, _retry_depth=0):
def _chat_completion(self, messages, temperature=1.0, tools=None, stream=None, reasoning_steps=False, task_name=None, task_description=None, task_id=None, response_format=None, _retry_depth=0):
start_time = time.time()

# --- Context compaction (opt-in via ExecutionConfig.context_compaction) ---
Expand Down Expand Up @@ -577,6 +577,39 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=True, r
# Use the new _format_tools_for_completion helper method
formatted_tools = self._format_tools_for_completion(tools)

# Smart fallback for streaming: try streaming first, fall back to non-streaming if unsupported
if stream is None:
# Auto-detect: prefer streaming for better UX, fallback if adapter doesn't support it
try:
# First attempt: try with streaming enabled for better user experience
stream_callback = self.stream_emitter.emit if hasattr(self, 'stream_emitter') else None
final_response = self._execute_unified_chat_completion(
messages=messages,
temperature=temperature,
tools=formatted_tools,
stream=True, # Try streaming first
reasoning_steps=reasoning_steps,
task_name=task_name,
task_description=task_description,
task_id=task_id,
response_format=response_format,
stream_callback=stream_callback,
emit_events=True
)
return final_response
except ValueError as e:
if "Streaming is not supported" in str(e):
# Fallback: retry with non-streaming for sync adapters
logging.debug(f"{self.name}: Streaming not supported by adapter, falling back to non-streaming")
stream = False # Set for the main execution below
else:
raise # Re-raise if it's a different ValueError
except Exception:
# For any other exception, fall back to non-streaming
logging.debug(f"{self.name}: Streaming attempt failed, falling back to non-streaming")
stream = False # Set for the main execution below

# If stream was explicitly set or fallback occurred, use the specified/fallback value
try:
# NEW: Unified protocol dispatch path (Issue #1304, #1362)
# UNIFIED: Single protocol-driven dispatch path (fixes DRY violation)
Expand Down Expand Up @@ -774,7 +807,7 @@ def _execute_unified_chat_completion(
messages,
temperature=1.0,
tools=None,
stream=True,
stream=None,
reasoning_steps=False,
task_name=None,
task_description=None,
Expand Down Expand Up @@ -806,6 +839,47 @@ def _execute_unified_chat_completion(
# Cache the dispatcher
self._unified_dispatcher = dispatcher

# Smart fallback for streaming: try streaming first, fall back to non-streaming if unsupported
if stream is None:
# Auto-detect: prefer streaming for better UX, fallback if adapter doesn't support it
try:
# First attempt: try with streaming enabled for better user experience
if stream_callback is None and hasattr(self, 'stream_emitter'):
stream_callback = getattr(self.stream_emitter, 'emit', None)
final_response = self._unified_dispatcher.chat_completion(
messages=messages,
tools=tools,
tool_choice=getattr(self, 'tool_choice', None),
temperature=temperature,
max_tokens=getattr(self, 'max_tokens', None),
stream=True, # Try streaming first
response_format=response_format,
execute_tool_fn=getattr(self, 'execute_tool', None),
console=self.console if (self.verbose or True) else None, # Enable console for streaming
display_fn=self._display_generating if self.verbose else None,
stream_callback=stream_callback,
emit_events=emit_events,
verbose=self.verbose,
max_iterations=10,
reasoning_steps=reasoning_steps,
task_name=task_name,
task_description=task_description,
task_id=task_id,
agent_name=getattr(self, 'name', 'assistant')
)
return final_response
except ValueError as e:
if "Streaming is not supported" in str(e):
# Fallback: retry with non-streaming for sync adapters
logging.debug(f"Agent: Streaming not supported by adapter, falling back to non-streaming")
stream = False # Set for the main execution below
else:
raise # Re-raise if it's a different ValueError
except Exception:
# For any other exception, fall back to non-streaming
logging.debug(f"Agent: Streaming attempt failed, falling back to non-streaming")
stream = False # Set for the main execution below

# Execute unified dispatch with all necessary parameters
# Includes all parameters from both legacy paths to ensure full compatibility
try:
Expand Down Expand Up @@ -857,7 +931,7 @@ async def _execute_unified_achat_completion(
messages,
temperature=1.0,
tools=None,
stream=True,
stream=True, # Async methods keep stream=True default (async adapters support streaming vs sync smart fallback)
reasoning_steps=False,
task_name=None,
task_description=None,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import ast
from pathlib import Path


def _chat_mixin_path() -> Path:
for candidate in Path(__file__).resolve().parents:
chat_mixin = candidate / "praisonaiagents" / "agent" / "chat_mixin.py"
if chat_mixin.exists():
return chat_mixin
raise FileNotFoundError("Could not locate praisonaiagents/agent/chat_mixin.py")


def get_chat_mixin_stream_parameter_default(function_name: str):
chat_mixin = _chat_mixin_path()
tree = ast.parse(chat_mixin.read_text(encoding="utf-8"))

for node in tree.body:
if isinstance(node, ast.ClassDef) and node.name == "ChatMixin":
for item in node.body:
if isinstance(item, ast.FunctionDef) and item.name == function_name:
args = item.args.args
defaults = item.args.defaults
first_default_index = len(args) - len(defaults)
for idx, arg in enumerate(args):
if arg.arg == "stream":
default_idx = idx - first_default_index
if default_idx < 0:
raise AssertionError(
f"Function '{function_name}' has no default for stream"
)
default = defaults[default_idx]
if isinstance(default, ast.Constant):
return default.value
raise AssertionError(
f"Function '{function_name}' stream default is non-constant"
)
raise AssertionError(f"Function '{function_name}' or stream parameter not found")


def test_chat_completion_stream_default_is_none():
assert get_chat_mixin_stream_parameter_default("_chat_completion") is None


def test_execute_unified_chat_completion_stream_default_is_none():
assert (
get_chat_mixin_stream_parameter_default("_execute_unified_chat_completion")
is None
)
Loading