|
8 | 8 | import json |
9 | 9 | import os |
10 | 10 | from uuid import UUID |
11 | | -from fastapi import APIRouter, WebSocket, Query, Depends, Form, Request |
12 | | -from fastapi.responses import Response, JSONResponse |
| 11 | +from call_processing.utils import normalize_indian_phone_number |
| 12 | +from fastapi import APIRouter, WebSocket, Query, Depends, Form |
| 13 | +from fastapi.responses import Response |
13 | 14 | from twilio.twiml.voice_response import VoiceResponse, Connect, Stream |
14 | 15 | from call_processing.log.logger import logger |
15 | 16 | from dependency_injector.wiring import inject, Provide |
@@ -114,85 +115,6 @@ async def inbound_webhook( |
114 | 115 | return Response(content=twiml_xml, media_type='application/xml') |
115 | 116 |
|
116 | 117 |
|
117 | | -@webhook_router.post('/exotel/inbound') |
118 | | -@inject |
119 | | -async def exotel_inbound_webhook( |
120 | | - request: Request, |
121 | | - voice_agent_cache_service: VoiceAgentCacheService = Depends( |
122 | | - Provide[ApplicationContainer.voice_agent_cache_service] |
123 | | - ), |
124 | | -): |
125 | | - """ |
126 | | - Exotel inbound webhook endpoint |
127 | | -
|
128 | | - Called by Exotel App Bazaar passthrough when an inbound call is received. |
129 | | - Looks up the voice agent by inbound phone number and returns WebSocket URL |
130 | | - along with agent info as JSON. |
131 | | -
|
132 | | - Expected request body (form or JSON): |
133 | | - CallSid: Exotel call identifier |
134 | | - From: Caller's phone number |
135 | | - To: Called phone number (the Exotel virtual number) |
136 | | - """ |
137 | | - # Parse body (support both form-encoded and JSON) |
138 | | - content_type = request.headers.get('content-type', '') |
139 | | - if 'application/json' in content_type: |
140 | | - data = await request.json() |
141 | | - else: |
142 | | - form = await request.form() |
143 | | - data = dict(form) |
144 | | - |
145 | | - call_sid = data.get('CallSid', '') |
146 | | - from_number = data.get('From', '') |
147 | | - to_number = data.get('To', '') |
148 | | - |
149 | | - # Mask phone numbers for privacy |
150 | | - masked_from = f'***{from_number[-4:]}' if len(from_number) > 4 else '****' |
151 | | - masked_to = f'***{to_number[-4:]}' if len(to_number) > 4 else '****' |
152 | | - logger.info( |
153 | | - f'Exotel inbound call received: From={masked_from}, To={masked_to}, CallSid={call_sid}' |
154 | | - ) |
155 | | - |
156 | | - # Look up agent by inbound number |
157 | | - agent = await voice_agent_cache_service.get_agent_by_inbound_number(to_number) |
158 | | - |
159 | | - if not agent: |
160 | | - logger.error(f'No voice agent found for inbound number: {to_number}') |
161 | | - return JSONResponse( |
162 | | - status_code=404, |
163 | | - content={'error': 'No voice agent configured for this number'}, |
164 | | - ) |
165 | | - |
166 | | - agent_id = agent['id'] |
167 | | - logger.info( |
168 | | - f'Agent found for inbound number {to_number}: {agent_id} ({agent["name"]})' |
169 | | - ) |
170 | | - |
171 | | - # Build WebSocket URL |
172 | | - base_url = os.getenv('CALL_PROCESSING_BASE_URL', 'http://localhost:8003') |
173 | | - |
174 | | - if base_url.startswith('https://'): |
175 | | - websocket_url = base_url.replace('https://', 'wss://') |
176 | | - elif base_url.startswith('http://'): |
177 | | - websocket_url = base_url.replace('http://', 'ws://') |
178 | | - else: |
179 | | - websocket_url = f'wss://{base_url}' |
180 | | - |
181 | | - websocket_url = f'{websocket_url}/webhooks/ws' |
182 | | - |
183 | | - logger.info(f'Exotel inbound - WebSocket URL: {websocket_url}') |
184 | | - |
185 | | - return JSONResponse( |
186 | | - content={ |
187 | | - 'voice_agent_id': str(agent_id), |
188 | | - 'agent_name': agent.get('name', ''), |
189 | | - 'websocket_url': websocket_url, |
190 | | - 'customer_number': from_number, |
191 | | - 'welcome_message': agent.get('welcome_message', ''), |
192 | | - } |
193 | | - ) |
194 | | - |
195 | | - |
196 | 118 | @webhook_router.post('/twiml') |
197 | 119 | async def twiml_endpoint( |
198 | 120 | From: str = Form(...), |
@@ -385,3 +307,149 @@ async def websocket_endpoint( |
385 | 307 | logger.error(f'Error in WebSocket endpoint: {e}', exc_info=True) |
386 | 308 | if websocket.client_state.name != 'DISCONNECTED': |
387 | 309 | await websocket.close(code=1011, reason='Internal error') |
| 310 | + |
| 311 | + |
| 312 | +@webhook_router.websocket('/exotel/inbound/ws') |
| 313 | +@inject |
| 314 | +async def exotel_inbound_websocket( |
| 315 | + websocket: WebSocket, |
| 316 | + voice_agent_cache_service: VoiceAgentCacheService = Depends( |
| 317 | + Provide[ApplicationContainer.voice_agent_cache_service] |
| 318 | + ), |
| 319 | +): |
| 320 | + """ |
| 321 | + Exotel inbound WebSocket endpoint |
| 322 | +
|
| 323 | + Direct WebSocket connection for Exotel AppBazaar voicebot integration. |
| 324 | + Handles bidirectional audio streaming with Pipecat pipeline. |
| 325 | +
|
| 326 | + Parameters are extracted from the Exotel WebSocket stream: |
| 327 | + CallSid: Exotel call identifier |
| 328 | + From: Caller's phone number (E.164 format) |
| 329 | + To: Called phone number (for agent lookup) |
| 330 | + """ |
| 331 | + await websocket.accept() |
| 332 | + logger.info('Exotel WebSocket connection accepted') |
| 333 | + |
| 334 | + try: |
| 335 | + # Parse Exotel connection and extract call data |
| 336 | + runner_args = WebSocketRunnerArguments(websocket=websocket) |
| 337 | + transport_type, call_data = await parse_telephony_websocket( |
| 338 | + runner_args.websocket |
| 339 | + ) |
| 340 | + |
| 341 | + logger.info(f'Auto-detected transport: {transport_type}') |
| 342 | + |
| 343 | + # Verify it's actually Exotel |
| 344 | + if transport_type != 'exotel': |
| 345 | + logger.error(f'Expected Exotel transport, got: {transport_type}') |
| 346 | + await websocket.close( |
| 347 | + code=1008, reason=f'Unexpected transport type: {transport_type}' |
| 348 | + ) |
| 349 | + return |
| 350 | + |
| 351 | + # Extract parameters from Exotel stream |
| 352 | + call_sid = call_data.get('call_id', '') |
| 353 | + from_number = call_data.get('from', '') |
| 354 | + to_number = call_data.get('to', '') |
| 355 | + |
| 356 | + # Mask phone numbers for privacy |
| 357 | + masked_from = f'***{from_number[-4:]}' if len(from_number) > 4 else '****' |
| 358 | + masked_to = f'***{to_number[-4:]}' if len(to_number) > 4 else '****' |
| 359 | + logger.info( |
| 360 | + f'Exotel call: CallSid={call_sid}, From={masked_from}, To={masked_to}' |
| 361 | + ) |
| 362 | + |
| 363 | + if not to_number: |
| 364 | + logger.error('No "to" number found in Exotel call data') |
| 365 | + await websocket.close(code=1008, reason='Missing "to" number in call data') |
| 366 | + return |
| 367 | + |
| 368 | + # Normalize phone numbers to E.164 format (database stores in E.164) |
| 369 | + # Exotel sends Indian numbers as 0xxxxxxxxxx, we need +91xxxxxxxxxx |
| 370 | + normalized_to_number = normalize_indian_phone_number(to_number) |
| 371 | + normalized_from_number = normalize_indian_phone_number(from_number) |
| 372 | + |
| 373 | + # Mask normalized numbers for logging |
| 374 | + masked_normalized_to = ( |
| 375 | + f'***{normalized_to_number[-4:]}' |
| 376 | + if len(normalized_to_number) > 4 |
| 377 | + else '****' |
| 378 | + ) |
| 379 | + |
| 380 | + # Look up agent by inbound number |
| 381 | + logger.info(f'Looking up agent by inbound number: {masked_normalized_to}') |
| 382 | + agent = await voice_agent_cache_service.get_agent_by_inbound_number( |
| 383 | + normalized_to_number |
| 384 | + ) |
| 385 | + |
| 386 | + if not agent: |
| 387 | + logger.error( |
| 388 | + f'No voice agent found for Exotel inbound number: {masked_normalized_to}' |
| 389 | + ) |
| 390 | + await websocket.close( |
| 391 | + code=1008, reason='No voice agent configured for this number' |
| 392 | + ) |
| 393 | + return |
| 394 | + |
| 395 | + voice_agent_id = agent['id'] |
| 396 | + logger.info( |
| 397 | + f'Agent found for Exotel inbound {masked_normalized_to}: {voice_agent_id} ({agent["name"]})' |
| 398 | + ) |
| 399 | + |
| 400 | + # Convert voice_agent_id to UUID |
| 401 | + try: |
| 402 | + agent_uuid = UUID(voice_agent_id) |
| 403 | + except ValueError: |
| 404 | + logger.error(f'Invalid UUID format for voice_agent_id: {voice_agent_id}') |
| 405 | + await websocket.close(code=1008, reason='Invalid voice_agent_id format') |
| 406 | + return |
| 407 | + |
| 408 | + # Fetch all configs from cache with API fallback |
| 409 | + configs = await voice_agent_cache_service.get_all_agent_configs(agent_uuid) |
| 410 | + logger.info('Successfully fetched all configs from cache') |
| 411 | + |
| 412 | + # Create Exotel frame serializer |
| 413 | + serializer = ExotelFrameSerializer( |
| 414 | + stream_sid=call_data['stream_id'], |
| 415 | + call_sid=call_data.get('call_id'), |
| 416 | + ) |
| 417 | + |
| 418 | + # Create FastAPI WebSocket transport |
| 419 | + transport = FastAPIWebsocketTransport( |
| 420 | + websocket=websocket, |
| 421 | + params=FastAPIWebsocketParams( |
| 422 | + audio_in_enabled=True, |
| 423 | + audio_out_enabled=True, |
| 424 | + audio_in_passthrough=True, |
| 425 | + add_wav_header=False, |
| 426 | + vad_analyzer=SileroVADAnalyzer( |
| 427 | + params=VADParams( |
| 428 | + confidence=0.7, |
| 429 | + start_secs=0.2, |
| 430 | + stop_secs=0.2, |
| 431 | + min_volume=0.6, |
| 432 | + ), |
| 433 | + ), |
| 434 | + serializer=serializer, |
| 435 | + ), |
| 436 | + ) |
| 437 | + |
| 438 | + logger.info(f'Starting Pipecat pipeline for Exotel call {call_sid}') |
| 439 | + |
| 440 | + # Run conversation pipeline |
| 441 | + pipecat_service = PipecatService() |
| 442 | + await pipecat_service.run_conversation( |
| 443 | + transport=transport, |
| 444 | + agent_config=configs['agent'], |
| 445 | + llm_config=configs['llm_config'], |
| 446 | + tts_config=configs['tts_config'], |
| 447 | + stt_config=configs['stt_config'], |
| 448 | + tools=configs['tools'], |
| 449 | + customer_number=normalized_from_number, |
| 450 | + ) |
| 451 | + |
| 452 | + except Exception as e: |
| 453 | + logger.error(f'Error in Exotel WebSocket endpoint: {e}', exc_info=True) |
| 454 | + if websocket.client_state.name != 'DISCONNECTED': |
| 455 | + await websocket.close(code=1011, reason='Internal error') |
0 commit comments