From dcd6024cbecd706621ba8b699909883eaf233d43 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Wed, 28 Jan 2026 19:21:17 +0530 Subject: [PATCH 1/3] added language and env_conversation detection tool --- .../services/conversation_completion_tool.py | 88 +++++ .../services/language_detection_tool.py | 222 +++++++++++ .../services/pipecat_service.py | 357 ++++++------------ 3 files changed, 426 insertions(+), 241 deletions(-) create mode 100644 wavefront/server/apps/call_processing/call_processing/services/conversation_completion_tool.py create mode 100644 wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py diff --git a/wavefront/server/apps/call_processing/call_processing/services/conversation_completion_tool.py b/wavefront/server/apps/call_processing/call_processing/services/conversation_completion_tool.py new file mode 100644 index 00000000..c7f393b8 --- /dev/null +++ b/wavefront/server/apps/call_processing/call_processing/services/conversation_completion_tool.py @@ -0,0 +1,88 @@ +""" +Conversation Completion Tool for Voice Agents + +Provides LLM-callable conversation ending capabilities +""" + +from typing import Dict, Any, Callable +from pipecat.services.llm_service import FunctionCallParams +from pipecat.frames.frames import TTSSpeakFrame, EndTaskFrame +from pipecat.processors.frame_processor import FrameDirection +from call_processing.log.logger import logger + + +class ConversationCompletionToolFactory: + """Factory for creating conversation completion tool with runtime context""" + + @staticmethod + def create_conversation_completion_tool( + task_container: Dict[str, Any], + ) -> Callable: + """ + Create conversation completion tool function with captured context + + Args: + task_container: Dictionary containing PipelineTask (populated after task creation) + Format: {'task': PipelineTask | None} + + Returns: + Async function compatible with Pipecat's function calling + """ + + async def end_conversation(params: FunctionCallParams): + """ + LLM-callable function to end the conversation gracefully + + This function is called by the LLM when it determines the user + wants to end the conversation. It sends a farewell message and + terminates the pipeline. + + Parameters (from LLM): + farewell_message: str - Optional custom farewell message + (defaults to standard goodbye) + """ + try: + # Get task from container + task = task_container.get('task') + if not task: + error_msg = ( + 'Pipeline task not initialized in conversation completion tool' + ) + logger.error(error_msg) + await params.result_callback({'success': False, 'error': error_msg}) + return + + # Extract parameters + arguments = params.arguments + farewell_message = arguments.get( + 'farewell_message', 'Thank you for using our service! Goodbye!' + ) + + logger.info( + f'Conversation completion tool called - Farewell: "{farewell_message}"' + ) + + # Send farewell message via TTS + await params.llm.push_frame(TTSSpeakFrame(farewell_message)) + + # End the conversation + await params.llm.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) + + logger.info('Conversation ended by LLM decision') + + # Return success result + await params.result_callback( + { + 'success': True, + 'status': 'complete', + 'farewell_sent': True, + 'farewell_message': farewell_message, + } + ) + + except Exception as e: + error_msg = f'Error ending conversation: {str(e)}' + logger.error(error_msg, exc_info=True) + await params.result_callback({'success': False, 'error': error_msg}) + + return end_conversation diff --git a/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py b/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py new file mode 100644 index 00000000..275c4e6b --- /dev/null +++ b/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py @@ -0,0 +1,222 @@ +""" +Language Detection Tool for Multi-Language Voice Agents + +Provides LLM-callable language detection and switching capabilities +""" + +from typing import Dict, Any, List, Callable +from pipecat.services.llm_service import FunctionCallParams +from pipecat.frames.frames import ManuallySwitchServiceFrame, LLMMessagesUpdateFrame +from call_processing.log.logger import logger +from call_processing.constants.language_config import LANGUAGE_INSTRUCTIONS + + +class LanguageDetectionToolFactory: + """Factory for creating language detection tool with runtime context""" + + @staticmethod + def create_language_detection_tool( + task_container: Dict[str, Any], + stt_services: Dict[str, Any], + tts_services: Dict[str, Any], + context_container: Dict[str, Any], + supported_languages: List[str], + default_language: str, + language_state: Dict[str, Any], + ) -> Callable: + """ + Create language detection tool function with captured context + + Args: + task_container: Dictionary containing PipelineTask (populated after task creation) + Format: {'task': PipelineTask | None} + stt_services: Dictionary mapping language codes to STT services + tts_services: Dictionary mapping language codes to TTS services + context_container: Dictionary containing LLMContext (populated after context creation) + Format: {'context': LLMContext | None} + supported_languages: List of supported language codes + default_language: Default language code + language_state: Dictionary to track current language and switch count + Format: {'current_language': str, 'switch_count': int, 'original_system_prompt': str} + + Returns: + Async function compatible with Pipecat's function calling + """ + + async def detect_and_switch_language(params: FunctionCallParams): + """ + LLM-callable function to detect and switch conversation language + + This function is called by the LLM when it determines the user + wants to switch to a different language. It validates the request, + performs the service switch, and updates the system prompt. + + Parameters (from LLM): + target_language: str - Language code to switch to (e.g., 'es', 'hi', 'en') + user_intent: str - User's stated language preference (for logging) + """ + try: + # Get task and context from containers + task = task_container.get('task') + if not task: + error_msg = ( + 'Pipeline task not initialized in language detection tool' + ) + logger.error(error_msg) + await params.result_callback({'success': False, 'error': error_msg}) + return + + context = context_container.get('context') + if not context: + error_msg = 'LLM context not initialized in language detection tool' + logger.error(error_msg) + await params.result_callback({'success': False, 'error': error_msg}) + return + + # Extract parameters + arguments = params.arguments + target_language = arguments.get('target_language', '').lower() + user_intent = arguments.get('user_intent', 'Unknown') + + current_language = language_state.get( + 'current_language', default_language + ) + switch_count = language_state.get('switch_count', 0) + + logger.info( + f'Language detection tool called - Target: {target_language}, ' + f'Current: {current_language}, User intent: {user_intent}' + ) + + # Validation 1: Check if target language is supported + if target_language not in supported_languages: + error_msg = ( + f"Language '{target_language}' is not supported. " + f"Supported languages: {', '.join(supported_languages)}" + ) + logger.warning(error_msg) + await params.result_callback( + { + 'success': False, + 'error': error_msg, + 'current_language': current_language, + 'supported_languages': supported_languages, + } + ) + return + + # Validation 2: Check if already in target language + if target_language == current_language: + logger.info(f'Already using language: {target_language}') + await params.result_callback( + { + 'success': True, + 'message': f'Already using {target_language}', + 'current_language': current_language, + 'switch_performed': False, + } + ) + return + + # Validation 3: Check if services exist for target language + if ( + target_language not in stt_services + or target_language not in tts_services + ): + error_msg = ( + f'Services not configured for language: {target_language}' + ) + logger.error(error_msg) + await params.result_callback( + { + 'success': False, + 'error': error_msg, + 'current_language': current_language, + } + ) + return + + # Perform language switch + try: + target_stt = stt_services[target_language] + target_tts = tts_services[target_language] + + # Queue service switch frames + await task.queue_frames( + [ + ManuallySwitchServiceFrame(service=target_stt), + ManuallySwitchServiceFrame(service=target_tts), + ] + ) + + logger.info( + f'Switched STT/TTS services from {current_language} to {target_language}' + ) + + # Update system prompt with language instruction + language_instruction = LANGUAGE_INSTRUCTIONS.get( + target_language, + LANGUAGE_INSTRUCTIONS.get('en', 'Respond in English.'), + ) + + # Get base prompt without language instruction (must exist for multi-language) + base_prompt = language_state.get('original_system_prompt') + if not base_prompt: + error_msg = 'Original system prompt not found in language state' + logger.error(error_msg) + await params.result_callback( + {'success': False, 'error': error_msg} + ) + return + + # Append new language instruction to clean base prompt + updated_content = f'{base_prompt}\n\n{language_instruction}' + updated_system_message = { + 'role': 'system', + 'content': updated_content, + } + + # Update context + current_messages = context.get_messages() + new_messages = [updated_system_message] + current_messages[1:] + await task.queue_frame( + LLMMessagesUpdateFrame(new_messages, run_llm=False) + ) + + logger.info( + f'Updated system prompt with {target_language} instruction' + ) + + # Update state + language_state['current_language'] = target_language + language_state['switch_count'] = switch_count + 1 + + # Return success result + await params.result_callback( + { + 'success': True, + 'message': f'Language switched to {target_language}', + 'previous_language': current_language, + 'current_language': target_language, + 'switch_performed': True, + 'switch_count': language_state['switch_count'], + } + ) + + except Exception as e: + error_msg = f'Error switching services: {str(e)}' + logger.error(error_msg, exc_info=True) + await params.result_callback( + { + 'success': False, + 'error': error_msg, + 'current_language': current_language, + } + ) + + except Exception as e: + error_msg = f'Error in language detection tool: {str(e)}' + logger.error(error_msg, exc_info=True) + await params.result_callback({'success': False, 'error': error_msg}) + + return detect_and_switch_language diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index ccf23002..6e333cf6 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -13,12 +13,6 @@ from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 -from pipecat.frames.frames import ( - TTSSpeakFrame, - EndTaskFrame, - ManuallySwitchServiceFrame, - LLMMessagesUpdateFrame, -) from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -27,11 +21,8 @@ LLMContextAggregatorPair, LLMUserAggregatorParams, ) -from pipecat.processors.frame_processor import FrameProcessor, FrameDirection -from pipecat.processors.user_idle_processor import UserIdleProcessor from pipecat.processors.transcript_processor import ( TranscriptProcessor, - TranscriptionMessage, ) from pipecat.pipeline.service_switcher import ( ServiceSwitcher, @@ -51,101 +42,17 @@ TurnAnalyzerUserTurnStopStrategy, # TranscriptionUserTurnStopStrategy ) -from pipecat.services.llm_service import FunctionCallParams from call_processing.services.stt_service import STTServiceFactory from call_processing.services.tts_service import TTSServiceFactory from call_processing.services.llm_service import LLMServiceFactory +from call_processing.services.conversation_completion_tool import ( + ConversationCompletionToolFactory, +) from call_processing.constants.language_config import ( - LANGUAGE_KEYWORDS, LANGUAGE_INSTRUCTIONS, ) -# Advanced handler with retry logic -async def handle_user_idle(processor: FrameProcessor, retry_count): - if retry_count == 1: - # First attempt - gentle reminder - await processor.push_frame(TTSSpeakFrame('Are you still there?')) - return True # Continue monitoring - elif retry_count == 2: - # Second attempt - more direct prompt - await processor.push_frame( - TTSSpeakFrame('Would you like to continue our conversation?') - ) - return True # Continue monitoring - else: - # Third attempt - end conversation - await processor.push_frame( - TTSSpeakFrame("I'll leave you for now. Have a nice day!") - ) - await processor.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) - return False # Stop monitoring - - -async def evaluate_completion_criteria(params: FunctionCallParams): - """ - Check if the last user message contains goodbye-related phrases. - Returns True if goodbye detected, False otherwise. - """ - context = params.context - - # Get the conversation messages - messages = context.get_messages() - - # Find the last user message - last_user_message = None - for message in reversed(messages): - if message.get('role') == 'user': - last_user_message = message.get('content', '').lower() - break - - # If no user message found, conversation is not complete - if not last_user_message: - return False - - # List of goodbye phrases to check - goodbye_phrases = [ - 'goodbye', - 'bye', - 'good bye', - 'see you', - 'talk to you later', - 'ttyl', - 'have a good day', - 'take care', - 'farewell', - 'later', - 'peace out', - ] - - # Check if any goodbye phrase is in the message - return any(phrase in last_user_message for phrase in goodbye_phrases) - - -async def check_conversation_complete(params: FunctionCallParams): - """ - Function to check if conversation should end based on goodbye detection. - """ - # Check if goodbye is present - conversation_complete = await evaluate_completion_criteria(params) - - if conversation_complete: - # Send farewell message - await params.llm.push_frame( - TTSSpeakFrame('Thank you for using our service! Goodbye!') - ) - # End the conversation - await params.llm.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM) - - # Return result to LLM - await params.result_callback( - { - 'status': 'complete' if conversation_complete else 'continuing', - 'goodbye_detected': conversation_complete, - } - ) - - class PipecatService: """Service for creating and running Pipecat pipelines""" @@ -189,6 +96,13 @@ async def run_conversation( f'default: {default_language}, multi-language: {is_multi_language}' ) + # Track language state for multi-language conversations + language_state = { + 'current_language': default_language, + 'switch_count': 0, + 'original_system_prompt': '', + } + # Create LLM service (language-agnostic) llm = LLMServiceFactory.create_llm_service(llm_config) @@ -290,11 +204,26 @@ async def run_conversation( tts = TTSServiceFactory.create_tts_service(tts_config_with_params) # Create initial messages with system prompt + base_system_prompt = ( + f'Customer phone number: {customer_number}\n' + + agent_config['system_prompt'] + ) + + # Add language instruction for default language if multi-language + if is_multi_language: + initial_language_instruction = LANGUAGE_INSTRUCTIONS.get( + default_language, LANGUAGE_INSTRUCTIONS.get('en', 'Respond in English.') + ) + system_content = f'{base_system_prompt}\n\n{initial_language_instruction}' + # Store base prompt without language instruction for switching + language_state['original_system_prompt'] = base_system_prompt + else: + system_content = base_system_prompt + messages = [ { 'role': 'system', - 'content': f'Customer phone number: {customer_number}\n' - + agent_config['system_prompt'], + 'content': system_content, } ] @@ -326,25 +255,101 @@ async def run_conversation( else: logger.info(f'No tools configured for agent {agent_id}') - # Register built-in function handler with LLM service - llm.register_function( - 'check_conversation_complete', check_conversation_complete - ) + # Create containers for late binding (populated after creation) + task_container = {'task': None} + context_container = {'context': None} + + # Register language detection tool if multi-language enabled + if is_multi_language: + from call_processing.services.language_detection_tool import ( + LanguageDetectionToolFactory, + ) - # Create FunctionSchema for check_conversation_complete - check_complete_schema = FunctionSchema( - name='check_conversation_complete', - description='Check if conversation should end based on goodbye detection', - properties={}, # No parameters needed + language_detection_func = ( + LanguageDetectionToolFactory.create_language_detection_tool( + task_container=task_container, + stt_services=stt_services, + tts_services=tts_services, + context_container=context_container, + supported_languages=supported_languages, + default_language=default_language, + language_state=language_state, + ) + ) + + llm.register_function('detect_and_switch_language', language_detection_func) + logger.info('Registered language detection tool with LLM') + + # Register conversation completion tool + conversation_completion_func = ( + ConversationCompletionToolFactory.create_conversation_completion_tool( + task_container=task_container + ) + ) + llm.register_function('end_conversation', conversation_completion_func) + logger.info('Registered conversation completion tool with LLM') + + # Create FunctionSchema for conversation completion + end_conversation_schema = FunctionSchema( + name='end_conversation', + description=( + 'Call this function when the user indicates they want to end the conversation. ' + 'This includes goodbye phrases, expressions of completion, or any indication ' + 'that the user wants to hang up or finish the call. Examples: "goodbye", "bye", ' + '"thank you", "that\'s all", "I\'m done", etc.' + ), + properties={ + 'farewell_message': { + 'type': 'string', + 'description': ( + 'Optional custom farewell message to say to the user before ending. ' + 'If not provided, uses default: "Thank you for using our service! Goodbye!"' + ), + } + }, required=[], ) + # Create FunctionSchema for language detection (if multi-language) + language_detection_schemas = [] + if is_multi_language: + language_detection_schema = FunctionSchema( + name='detect_and_switch_language', + description=( + f"Detect and switch the conversation language when the user explicitly " + f"requests to speak in a different language. Call this when the user says " + f"something like 'switch to Spanish', 'I want to speak Hindi', 'let's talk in English', etc. " + f"Supported languages: {', '.join(supported_languages)}. " + f"Current language: {language_state['current_language']}." + ), + properties={ + 'target_language': { + 'type': 'string', + 'description': f"Target language code. Must be one of: {', '.join(supported_languages)}", + 'enum': supported_languages, + }, + 'user_intent': { + 'type': 'string', + 'description': "The user's statement indicating language preference (for logging)", + }, + }, + required=['target_language', 'user_intent'], + ) + language_detection_schemas.append(language_detection_schema) + # Combine all FunctionSchema objects for ToolsSchema - all_function_schemas = [check_complete_schema] + function_schemas + all_function_schemas = ( + [end_conversation_schema] + language_detection_schemas + function_schemas + ) tools_schema = ToolsSchema(standard_tools=all_function_schemas) # Create LLM context and aggregator context = LLMContext(messages, tools=tools_schema) + + # Populate context container for language detection tool (if multi-language) + if is_multi_language: + context_container['context'] = context + context_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( @@ -370,21 +375,11 @@ async def run_conversation( # Create transcript processor for language detection transcript = TranscriptProcessor() - # Track current language detection state (only for multi-language) - language_detected = {'detected': False, 'current_language': default_language} - - # Create user idle processor (fresh instance for each conversation) - user_idle = UserIdleProcessor( - callback=handle_user_idle, - timeout=4.0, - ) - # Build pipeline components list pipeline_components = [ transport.input(), # Audio input from Twilio stt, # Speech-to-Text (ServiceSwitcher for multi-lang, direct for single) transcript.user(), # Transcript processor for user messages - user_idle, # User idle detection context_aggregator.user(), # Add user message to context llm, # LLM processing tts, # Text-to-Speech (ServiceSwitcher for multi-lang, direct for single) @@ -406,132 +401,12 @@ async def run_conversation( # enable_usage_metrics=True, # report_only_initial_ttfb=True ), - idle_timeout_secs=20, # Safety net - allows UserIdleProcessor to complete 3 retries (4s each = 12s total) + idle_timeout_secs=20, ) - # Multi-language detection event handler + # Populate task container for language detection tool (if multi-language) if is_multi_language: - - @transcript.event_handler('on_transcript_update') - async def handle_language_detection(processor, frame): - """Detect language from first user message and switch services""" - - # Only detect once - if language_detected['detected']: - return - - messages: List[TranscriptionMessage] = frame.messages - - # Look for user messages - for message in messages: - if message.role == 'user': - message_content = message.content.lower().strip() - - # Skip empty messages - if not message_content: - continue - - logger.info( - f"Analyzing message for language detection: '{message_content}'" - ) - - # Check for language keywords - detected_lang = None - for lang_code in supported_languages: - keywords = LANGUAGE_KEYWORDS.get(lang_code, []) - for keyword in keywords: - if keyword.lower() in message_content: - detected_lang = lang_code - logger.info( - f'Language detected: {detected_lang} ' - f"(matched keyword: '{keyword}')" - ) - break - if detected_lang: - break - - # Use detected language or fallback to default - target_language = detected_lang or default_language - - if not detected_lang: - logger.info( - f'No language detected, using default: {default_language}' - ) - - # Mark detection as complete - language_detected['detected'] = True - language_detected['current_language'] = target_language - - # Only switch services if target language is different from default - if target_language != default_language: - if ( - target_language in stt_services - and target_language in tts_services - ): - target_stt = stt_services[target_language] - target_tts = tts_services[target_language] - - try: - await task.queue_frames( - [ - ManuallySwitchServiceFrame( - service=target_stt - ), - ManuallySwitchServiceFrame( - service=target_tts - ), - ] - ) - logger.info( - f'Switched STT/TTS services to language: {target_language}' - ) - except Exception as e: - logger.error( - f'Error switching services: {e}', exc_info=True - ) - else: - logger.info( - f'Language {target_language} is default, no service switch needed' - ) - - # Update LLM system prompt with language instruction - language_instruction = LANGUAGE_INSTRUCTIONS.get( - target_language, - LANGUAGE_INSTRUCTIONS.get('en', 'Respond in English.'), - ) - - # Get current system prompt and append language instruction - current_messages = context.get_messages() - if current_messages and len(current_messages) > 0: - system_message = current_messages[0] - else: - system_message = { - 'role': 'system', - 'content': agent_config['system_prompt'], - } - - updated_content = ( - f"{system_message['content']}\n\n{language_instruction}" - ) - updated_system_message = { - 'role': 'system', - 'content': updated_content, - } - - # Update context with new system message - new_messages = [updated_system_message] + current_messages[1:] - await task.queue_frame( - LLMMessagesUpdateFrame(new_messages, run_llm=False) - ) - - logger.info( - f'Updated LLM context with language instruction for {target_language}' - ) - - # Exit after first detection - break - - logger.info('Language detection event handler registered') + task_container['task'] = task # Register event handlers @transport.event_handler('on_client_connected') From 23e894c3f8e859c1d9950a3a8717fded84944874 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Mon, 2 Feb 2026 13:25:24 +0530 Subject: [PATCH 2/3] lang detection -> parallel pipeline instead of service switcher --- .../services/language_detection_tool.py | 45 ++--- .../services/pipecat_service.py | 182 +++++++++++++----- 2 files changed, 149 insertions(+), 78 deletions(-) diff --git a/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py b/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py index 275c4e6b..87b5b1e7 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py +++ b/wavefront/server/apps/call_processing/call_processing/services/language_detection_tool.py @@ -6,7 +6,7 @@ from typing import Dict, Any, List, Callable from pipecat.services.llm_service import FunctionCallParams -from pipecat.frames.frames import ManuallySwitchServiceFrame, LLMMessagesUpdateFrame +from pipecat.frames.frames import LLMMessagesUpdateFrame from call_processing.log.logger import logger from call_processing.constants.language_config import LANGUAGE_INSTRUCTIONS @@ -17,8 +17,8 @@ class LanguageDetectionToolFactory: @staticmethod def create_language_detection_tool( task_container: Dict[str, Any], - stt_services: Dict[str, Any], - tts_services: Dict[str, Any], + language_switcher: Any, + stt_language_switcher: Any, context_container: Dict[str, Any], supported_languages: List[str], default_language: str, @@ -30,8 +30,8 @@ def create_language_detection_tool( Args: task_container: Dictionary containing PipelineTask (populated after task creation) Format: {'task': PipelineTask | None} - stt_services: Dictionary mapping language codes to STT services - tts_services: Dictionary mapping language codes to TTS services + language_switcher: LanguageSwitcher instance that manages TTS routing + stt_language_switcher: STTLanguageSwitcher instance that manages STT routing context_container: Dictionary containing LLMContext (populated after context creation) Format: {'context': LLMContext | None} supported_languages: List of supported language codes @@ -118,39 +118,16 @@ async def detect_and_switch_language(params: FunctionCallParams): ) return - # Validation 3: Check if services exist for target language - if ( - target_language not in stt_services - or target_language not in tts_services - ): - error_msg = ( - f'Services not configured for language: {target_language}' - ) - logger.error(error_msg) - await params.result_callback( - { - 'success': False, - 'error': error_msg, - 'current_language': current_language, - } - ) - return - # Perform language switch try: - target_stt = stt_services[target_language] - target_tts = tts_services[target_language] - - # Queue service switch frames - await task.queue_frames( - [ - ManuallySwitchServiceFrame(service=target_stt), - ManuallySwitchServiceFrame(service=target_tts), - ] - ) + # Update TTS language switcher state + language_switcher.set_language(target_language) + + # Update STT language switcher state + stt_language_switcher.set_language(target_language) logger.info( - f'Switched STT/TTS services from {current_language} to {target_language}' + f'Switched TTS and STT language from {current_language} to {target_language}' ) # Update system prompt with language instruction diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index 6e333cf6..675ec733 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -13,6 +13,8 @@ from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.adapters.schemas.function_schema import FunctionSchema from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.frames.frames import Frame +from pipecat.pipeline.parallel_pipeline import ParallelPipeline from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -21,13 +23,15 @@ LLMContextAggregatorPair, LLMUserAggregatorParams, ) +from pipecat.processors.filters.function_filter import FunctionFilter from pipecat.processors.transcript_processor import ( TranscriptProcessor, ) -from pipecat.pipeline.service_switcher import ( - ServiceSwitcher, - ServiceSwitcherStrategyManual, -) + +# from pipecat.pipeline.service_switcher import ( +# ServiceSwitcher, +# ServiceSwitcherStrategyManual, +# ) from pipecat.transports.base_transport import BaseTransport from pipecat.turns.user_mute import ( FunctionCallUserMuteStrategy, @@ -36,7 +40,7 @@ from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.turns.user_start import ( VADUserTurnStartStrategy, - MinWordsUserTurnStartStrategy, + # MinWordsUserTurnStartStrategy, ) from pipecat.turns.user_stop import ( TurnAnalyzerUserTurnStopStrategy, @@ -53,6 +57,99 @@ ) +class STTLanguageSwitcher(ParallelPipeline): + """ + ParallelPipeline that routes STT to different language-specific services + based on current language state. Same pattern as LanguageSwitcher for TTS. + """ + + def __init__( + self, + stt_services: Dict[str, Any], + supported_languages: List[str], + default_language: str, + ): + self._current_language = default_language + self._stt_services = stt_services + self._supported_languages = supported_languages + + # Build parallel routes: one per language + routes = [] + for lang_code in supported_languages: + filter_func = self._create_language_filter(lang_code) + stt_service = stt_services[lang_code] + routes.append([FunctionFilter(filter_func), stt_service]) + + super().__init__(*routes) + + def _create_language_filter(self, lang_code: str): + """Create filter function for specific language""" + + async def language_filter(_: Frame) -> bool: + return self._current_language == lang_code + + return language_filter + + @property + def current_language(self): + return self._current_language + + def set_language(self, language_code: str): + """Update current language (called by language detection tool)""" + if language_code in self._supported_languages: + self._current_language = language_code + logger.info(f'STTLanguageSwitcher: Language set to {language_code}') + else: + logger.warning(f'STTLanguageSwitcher: Invalid language {language_code}') + + +class LanguageSwitcher(ParallelPipeline): + """ + ParallelPipeline that routes TTS to different language-specific services + based on current language state. + """ + + def __init__( + self, + tts_services: Dict[str, Any], + supported_languages: List[str], + default_language: str, + ): + self._current_language = default_language + self._tts_services = tts_services + self._supported_languages = supported_languages + + # Build parallel routes: one per language + # Each route: [FunctionFilter, TTS service] + routes = [] + for lang_code in supported_languages: + filter_func = self._create_language_filter(lang_code) + tts_service = tts_services[lang_code] + routes.append([FunctionFilter(filter_func), tts_service]) + + super().__init__(*routes) + + def _create_language_filter(self, lang_code: str): + """Create filter function for specific language""" + + async def language_filter(_: Frame) -> bool: + return self._current_language == lang_code + + return language_filter + + @property + def current_language(self): + return self._current_language + + def set_language(self, language_code: str): + """Update current language (called by language detection tool)""" + if language_code in self._supported_languages: + self._current_language = language_code + logger.info(f'LanguageSwitcher: Language set to {language_code}') + else: + logger.warning(f'LanguageSwitcher: Invalid language {language_code}') + + class PipecatService: """Service for creating and running Pipecat pipelines""" @@ -124,8 +221,7 @@ async def run_conversation( 'parameters': stt_parameters or {}, } - # Create STT/TTS services with multi-language support if needed - stt_services = {} + # Create TTS services (one per language for multi-language mode) tts_services = {} if is_multi_language: @@ -133,7 +229,7 @@ async def run_conversation( f'Multi-language mode enabled for languages: {supported_languages}' ) - # Create STT/TTS services for each supported language + # Create TTS services for each supported language for lang_code in supported_languages: # Get voice ID for this language voice_id_for_lang = tts_voice_ids_dict.get(lang_code) @@ -143,59 +239,57 @@ async def run_conversation( ) voice_id_for_lang = default_voice_id - # Deep clone configs to avoid mutating original configs - stt_config_lang = deepcopy(stt_config_with_params) + # Deep clone config to avoid mutating original tts_config_lang = deepcopy(tts_config_with_params) - # Update language in parameters - if 'parameters' not in stt_config_lang: - stt_config_lang['parameters'] = {} - stt_config_lang['parameters']['language'] = lang_code - + # Update language parameters if 'parameters' not in tts_config_lang: tts_config_lang['parameters'] = {} tts_config_lang['parameters']['language'] = lang_code - - # Set language-specific voice ID tts_config_lang['voice_id'] = voice_id_for_lang - # Create services - stt_services[lang_code] = STTServiceFactory.create_stt_service( - stt_config_lang - ) + # Create TTS service tts_services[lang_code] = TTSServiceFactory.create_tts_service( tts_config_lang ) - logger.info( - f'Created STT/TTS services for language: {lang_code} ' + f'Created TTS service for language: {lang_code} ' f'with voice: {voice_id_for_lang}' ) - # Create service switchers with manual strategy - # Order services list with default language first (ServiceSwitcher uses first as initial) - stt_services_list = [] - tts_services_list = [] + # Create per-language STT services (same pattern as TTS) + stt_services = {} + for lang_code in supported_languages: + stt_config_lang = deepcopy(stt_config_with_params) + if 'parameters' not in stt_config_lang: + stt_config_lang['parameters'] = {} + stt_config_lang['parameters']['language'] = lang_code - # Add default language service first - if default_language in stt_services: - stt_services_list.append(stt_services[default_language]) - tts_services_list.append(tts_services[default_language]) + stt_services[lang_code] = STTServiceFactory.create_stt_service( + stt_config_lang + ) + logger.info(f'Created STT service for language: {lang_code}') - # Add remaining services - for lang_code in supported_languages: - if lang_code != default_language: - stt_services_list.append(stt_services[lang_code]) - tts_services_list.append(tts_services[lang_code]) + # Create STTLanguageSwitcher for STT routing + stt = STTLanguageSwitcher( + stt_services=stt_services, + supported_languages=supported_languages, + default_language=default_language, + ) + logger.info( + f'Initialized STTLanguageSwitcher with default language: {default_language}' + ) - stt = ServiceSwitcher( - services=stt_services_list, strategy_type=ServiceSwitcherStrategyManual + # Create LanguageSwitcher for TTS routing + tts = LanguageSwitcher( + tts_services=tts_services, + supported_languages=supported_languages, + default_language=default_language, ) - tts = ServiceSwitcher( - services=tts_services_list, strategy_type=ServiceSwitcherStrategyManual + logger.info( + f'Initialized LanguageSwitcher with default language: {default_language}' ) - logger.info(f'Initialized with default language: {default_language}') else: logger.info('Single language mode - no language detection needed') @@ -268,8 +362,8 @@ async def run_conversation( language_detection_func = ( LanguageDetectionToolFactory.create_language_detection_tool( task_container=task_container, - stt_services=stt_services, - tts_services=tts_services, + language_switcher=tts, # Pass the TTS LanguageSwitcher instance + stt_language_switcher=stt, # Pass the STT LanguageSwitcher instance context_container=context_container, supported_languages=supported_languages, default_language=default_language, @@ -356,7 +450,7 @@ async def run_conversation( user_turn_strategies=UserTurnStrategies( start=[ VADUserTurnStartStrategy(), - MinWordsUserTurnStartStrategy(min_words=3), + # MinWordsUserTurnStartStrategy(min_words=3), ], # List of start strategies stop=[ TurnAnalyzerUserTurnStopStrategy( From fd3fe8f11c7be32657503b4beb77199ce1c53f7c Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Mon, 2 Feb 2026 13:50:07 +0530 Subject: [PATCH 3/3] little prompt fix for tool --- .../call_processing/services/pipecat_service.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py index 675ec733..4db6a46e 100644 --- a/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py +++ b/wavefront/server/apps/call_processing/call_processing/services/pipecat_service.py @@ -410,9 +410,11 @@ async def run_conversation( language_detection_schema = FunctionSchema( name='detect_and_switch_language', description=( - f"Detect and switch the conversation language when the user explicitly " - f"requests to speak in a different language. Call this when the user says " - f"something like 'switch to Spanish', 'I want to speak Hindi', 'let's talk in English', etc. " + f"Detect and switch the conversation language. Call this whenever the user " + f"indicates a language preference, including: responding with a language name " + f"(e.g., 'Hindi', 'Spanish', 'English'), requesting a switch (e.g., 'switch to Hindi', " + f"'I want to speak in Spanish'), or selecting a language when asked for their preference. " + f"Even a single word like 'Hindi' or 'Spanish' should trigger this tool if it refers to a language choice. " f"Supported languages: {', '.join(supported_languages)}. " f"Current language: {language_state['current_language']}." ),