Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import json
import os
from uuid import UUID
from fastapi import APIRouter, WebSocket, Query, Depends, Form, Request
from fastapi.responses import Response, JSONResponse
from call_processing.utils import normalize_indian_phone_number
from fastapi import APIRouter, WebSocket, Query, Depends, Form
from fastapi.responses import Response
from twilio.twiml.voice_response import VoiceResponse, Connect, Stream
from call_processing.log.logger import logger
from dependency_injector.wiring import inject, Provide
Expand Down Expand Up @@ -114,85 +115,6 @@ async def inbound_webhook(
return Response(content=twiml_xml, media_type='application/xml')


@webhook_router.post('/exotel/inbound')
@inject
async def exotel_inbound_webhook(
request: Request,
voice_agent_cache_service: VoiceAgentCacheService = Depends(
Provide[ApplicationContainer.voice_agent_cache_service]
),
):
"""
Exotel inbound webhook endpoint

Called by Exotel App Bazaar passthrough when an inbound call is received.
Looks up the voice agent by inbound phone number and returns WebSocket URL
along with agent info as JSON.

Expected request body (form or JSON):
CallSid: Exotel call identifier
From: Caller's phone number
To: Called phone number (the Exotel virtual number)
"""
# Parse body (support both form-encoded and JSON)
content_type = request.headers.get('content-type', '')
if 'application/json' in content_type:
data = await request.json()
else:
form = await request.form()
data = dict(form)

call_sid = data.get('CallSid', '')
from_number = data.get('From', '')
to_number = data.get('To', '')

# Mask phone numbers for privacy
masked_from = f'***{from_number[-4:]}' if len(from_number) > 4 else '****'
masked_to = f'***{to_number[-4:]}' if len(to_number) > 4 else '****'
logger.info(
f'Exotel inbound call received: From={masked_from}, To={masked_to}, CallSid={call_sid}'
)

# Look up agent by inbound number
agent = await voice_agent_cache_service.get_agent_by_inbound_number(to_number)

if not agent:
logger.error(f'No voice agent found for inbound number: {to_number}')
return JSONResponse(
status_code=404,
content={'error': 'No voice agent configured for this number'},
)

agent_id = agent['id']
logger.info(
f'Agent found for inbound number {to_number}: {agent_id} ({agent["name"]})'
)

# Build WebSocket URL
base_url = os.getenv('CALL_PROCESSING_BASE_URL', 'http://localhost:8003')

if base_url.startswith('https://'):
websocket_url = base_url.replace('https://', 'wss://')
elif base_url.startswith('http://'):
websocket_url = base_url.replace('http://', 'ws://')
else:
websocket_url = f'wss://{base_url}'

websocket_url = f'{websocket_url}/webhooks/ws'

logger.info(f'Exotel inbound - WebSocket URL: {websocket_url}')

return JSONResponse(
content={
'voice_agent_id': str(agent_id),
'agent_name': agent.get('name', ''),
'websocket_url': websocket_url,
'customer_number': from_number,
'welcome_message': agent.get('welcome_message', ''),
}
)


@webhook_router.post('/twiml')
async def twiml_endpoint(
From: str = Form(...),
Expand Down Expand Up @@ -385,3 +307,149 @@ async def websocket_endpoint(
logger.error(f'Error in WebSocket endpoint: {e}', exc_info=True)
if websocket.client_state.name != 'DISCONNECTED':
await websocket.close(code=1011, reason='Internal error')


@webhook_router.websocket('/exotel/inbound/ws')
@inject
async def exotel_inbound_websocket(
websocket: WebSocket,
voice_agent_cache_service: VoiceAgentCacheService = Depends(
Provide[ApplicationContainer.voice_agent_cache_service]
),
):
"""
Exotel inbound WebSocket endpoint

Direct WebSocket connection for Exotel AppBazaar voicebot integration.
Handles bidirectional audio streaming with Pipecat pipeline.

Parameters are extracted from the Exotel WebSocket stream:
CallSid: Exotel call identifier
From: Caller's phone number (E.164 format)
To: Called phone number (for agent lookup)
"""
await websocket.accept()
logger.info('Exotel WebSocket connection accepted')

try:
# Parse Exotel connection and extract call data
runner_args = WebSocketRunnerArguments(websocket=websocket)
transport_type, call_data = await parse_telephony_websocket(
runner_args.websocket
)

logger.info(f'Auto-detected transport: {transport_type}')

# Verify it's actually Exotel
if transport_type != 'exotel':
logger.error(f'Expected Exotel transport, got: {transport_type}')
await websocket.close(
code=1008, reason=f'Unexpected transport type: {transport_type}'
)
return

# Extract parameters from Exotel stream
call_sid = call_data.get('call_id', '')
from_number = call_data.get('from', '')
to_number = call_data.get('to', '')

# Mask phone numbers for privacy
masked_from = f'***{from_number[-4:]}' if len(from_number) > 4 else '****'
masked_to = f'***{to_number[-4:]}' if len(to_number) > 4 else '****'
logger.info(
f'Exotel call: CallSid={call_sid}, From={masked_from}, To={masked_to}'
)

if not to_number:
logger.error('No "to" number found in Exotel call data')
await websocket.close(code=1008, reason='Missing "to" number in call data')
return

# Normalize phone numbers to E.164 format (database stores in E.164)
# Exotel sends Indian numbers as 0xxxxxxxxxx, we need +91xxxxxxxxxx
normalized_to_number = normalize_indian_phone_number(to_number)
normalized_from_number = normalize_indian_phone_number(from_number)

# Mask normalized numbers for logging
masked_normalized_to = (
f'***{normalized_to_number[-4:]}'
if len(normalized_to_number) > 4
else '****'
)

# Look up agent by inbound number
logger.info(f'Looking up agent by inbound number: {masked_normalized_to}')
agent = await voice_agent_cache_service.get_agent_by_inbound_number(
normalized_to_number
)

if not agent:
logger.error(
f'No voice agent found for Exotel inbound number: {masked_normalized_to}'
)
await websocket.close(
code=1008, reason='No voice agent configured for this number'
)
return

voice_agent_id = agent['id']
logger.info(
f'Agent found for Exotel inbound {masked_normalized_to}: {voice_agent_id} ({agent["name"]})'
)

# Convert voice_agent_id to UUID
try:
agent_uuid = UUID(voice_agent_id)
except ValueError:
logger.error(f'Invalid UUID format for voice_agent_id: {voice_agent_id}')
await websocket.close(code=1008, reason='Invalid voice_agent_id format')
return

# Fetch all configs from cache with API fallback
configs = await voice_agent_cache_service.get_all_agent_configs(agent_uuid)
logger.info('Successfully fetched all configs from cache')

# Create Exotel frame serializer
serializer = ExotelFrameSerializer(
stream_sid=call_data['stream_id'],
call_sid=call_data.get('call_id'),
)
Comment on lines +413 to +416
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

call_data['stream_id'] will raise KeyError if missing.

Line 414 uses direct dict access for stream_id, while line 415 uses .get('call_id'). If the Exotel stream message is malformed and stream_id is absent, this will raise an unhandled KeyError (caught only by the broad except Exception). Consider using .get() with validation or adding an explicit check, consistent with how other fields are handled.

Suggested improvement
+        stream_id = call_data.get('stream_id')
+        if not stream_id:
+            logger.error('No stream_id found in Exotel call data')
+            await websocket.close(code=1008, reason='Missing stream_id in call data')
+            return
+
         # Create Exotel frame serializer
         serializer = ExotelFrameSerializer(
-            stream_sid=call_data['stream_id'],
+            stream_sid=stream_id,
             call_sid=call_data.get('call_id'),
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
serializer = ExotelFrameSerializer(
stream_sid=call_data['stream_id'],
call_sid=call_data.get('call_id'),
)
stream_id = call_data.get('stream_id')
if not stream_id:
logger.error('No stream_id found in Exotel call data')
await websocket.close(code=1008, reason='Missing stream_id in call data')
return
serializer = ExotelFrameSerializer(
stream_sid=stream_id,
call_sid=call_data.get('call_id'),
)
🤖 Prompt for AI Agents
In
`@wavefront/server/apps/call_processing/call_processing/controllers/webhook_controller.py`
around lines 413 - 416, The code directly indexes call_data['stream_id'] when
constructing ExotelFrameSerializer which will raise KeyError if stream_id is
missing; change to use call_data.get('stream_id') and add an explicit presence
check before creating ExotelFrameSerializer (e.g., validate stream_id is not
None/empty) and return or raise a controlled error/response if absent so
behavior matches how call_id is handled and avoids an unhandled KeyError in the
serializer construction.


# Create FastAPI WebSocket transport
transport = FastAPIWebsocketTransport(
websocket=websocket,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_passthrough=True,
add_wav_header=False,
vad_analyzer=SileroVADAnalyzer(
params=VADParams(
confidence=0.7,
start_secs=0.2,
stop_secs=0.2,
min_volume=0.6,
),
),
serializer=serializer,
),
)

logger.info(f'Starting Pipecat pipeline for Exotel call {call_sid}')

# Run conversation pipeline
pipecat_service = PipecatService()
await pipecat_service.run_conversation(
transport=transport,
agent_config=configs['agent'],
llm_config=configs['llm_config'],
tts_config=configs['tts_config'],
stt_config=configs['stt_config'],
tools=configs['tools'],
customer_number=normalized_from_number,
)

except Exception as e:
logger.error(f'Error in Exotel WebSocket endpoint: {e}', exc_info=True)
if websocket.client_state.name != 'DISCONNECTED':
await websocket.close(code=1011, reason='Internal error')
40 changes: 40 additions & 0 deletions wavefront/server/apps/call_processing/call_processing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,43 @@ def get_current_ist_time_str() -> str:
"- If the user says 'Monday', 'next Friday', etc., calculate the correct date relative to today.\n"
"- Always convert relative dates (like 'tomorrow', 'next week') to specific dates (YYYY-MM-DD) when calling tools."
)


def normalize_indian_phone_number(phone_number: str) -> str:
"""
Normalize Indian phone number to E.164 format.

Converts Indian national format (0xxxxxxxxxx) to E.164 format (+91xxxxxxxxxx).

Args:
phone_number: Phone number in various formats

Returns:
Phone number in E.164 format (+91xxxxxxxxxx)

Examples:
"01234567890" -> "+911234567890"
"+911234567890" -> "+911234567890"
"911234567890" -> "+911234567890"
"""
# Remove any whitespace
phone_number = phone_number.strip()

# If already in E.164 format with +91, return as is
if phone_number.startswith('+91'):
return phone_number

# If starts with 91 but no +, add +
if phone_number.startswith('91') and len(phone_number) >= 12:
return f'+{phone_number}'

# If starts with 0 (Indian national format), replace with +91
if phone_number.startswith('0'):
return f'+91{phone_number[1:]}'

# If it's just the number without country code, add +91
if len(phone_number) == 10:
return f'+91{phone_number}'

# Return as is if we can't determine format
return phone_number
Loading