-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasterisk_shim_server.py
More file actions
312 lines (267 loc) · 9.39 KB
/
asterisk_shim_server.py
File metadata and controls
312 lines (267 loc) · 9.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
"""
src/servers/asterisk_shim_server.py
FastAPI server for the Asterisk shim backend. Intended to be run on the same machine as Asterisk server.
Bridges Asterisk ExternalMedia (RTP μ-law) to a single WebSocket connected to ECS at /voice/voice (internal ALB).
The shim forwards audio frames and simple call events between Asterisk and ECS.
ECS owns the AI voice agent session and recordings.
Topology:
- Asterisk PSTN channel ↔ ARI mixing bridge ↔ ExternalMedia channel
- ExternalMedia UDP/RTP ↔ shim (this process)
- Shim ↔ WSS to ECS /voice/voice (internal ALB)
- ECS ↔ AI voice agent server
"""
import asyncio
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from ari.asterisk_ari_supervisor import ARISupervisor
from env_config import EnvConfig
from utils.logger import get_logger
# Constants _______________________________________________________________________________________
app = FastAPI(
title=__name__,
version="1.0.0",
debug=True,
docs_url="/docs",
)
logger = get_logger(name=__name__)
# CORS ____________________________________________________________________________________________
# sample CORS (very permissive)
app.add_middleware(
middleware_class=CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"],
)
# Global supervisor instance ______________________________________________________________________
_supervisor: ARISupervisor | None = None
_sup_task: asyncio.Task | None = None
# Helper functions _______________________________________________________________________________)
async def _start_supervisor() -> None:
"""
Start or restart the ARI supervisor.
"""
global _supervisor, _sup_task
# stop existing supervisor if running
if _supervisor:
await _supervisor.stop()
if _sup_task and not _sup_task.done():
_sup_task.cancel()
try:
await _sup_task
except asyncio.CancelledError:
pass
# start new supervisor
logger.info("Starting ARI supervisor")
_supervisor = ARISupervisor()
_sup_task = asyncio.create_task(
coro=_supervisor.run(),
name="ari-supervisor"
)
# monitor supervisor task and restart if it dies
async def monitor_supervisor() -> None:
"""
Monitor the supervisor task and restart it if it dies.
"""
try:
while True:
if _sup_task and _sup_task.done():
try:
exception = _sup_task.exception()
if exception:
logger.error("ARI supervisor died with exception: %s", exception)
else:
logger.warning("ARI supervisor exited unexpectedly")
except asyncio.CancelledError:
logger.warning("ARI supervisor was cancelled")
logger.info("Restarting ARI supervisor in 2 seconds...")
await asyncio.sleep(2.0)
await _start_supervisor()
break
await asyncio.sleep(1.0)
except Exception as e:
logger.error("Error in supervisor monitor: %s", e, exc_info=True)
# create and start the supervisor monitor task
asyncio.create_task(
coro=monitor_supervisor(),
name="supervisor-monitor"
)
# Startup/Shutdown ________________________________________________________________________________
@app.on_event("startup")
async def startup_event() -> None:
"""
Initialize and start the ARI supervisor background task.
"""
logger.info("Starting ARI supervisor")
await _start_supervisor()
@app.on_event("shutdown")
async def shutdown_event() -> None:
"""
Cancel the ARI supervisor task on application shutdown.
"""
logger.info("Shutting down ARI supervisor on application shutdown")
if _supervisor:
await _supervisor.stop()
if _sup_task:
_sup_task.cancel()
try:
await _sup_task
except asyncio.CancelledError:
pass
# HTTP endpoints __________________________________________________________________________________
@app.get("/health")
async def health() -> JSONResponse:
"""
Return health and number of active call sessions.
Returns:
JSONResponse: A JSON response containing the health and number of active call sessions.
"""
logger.debug("Health endpoint called")
if not _supervisor:
logger.error("Supervisor not initialized")
return JSONResponse({
"status": "error",
"message": "Supervisor not initialized"
})
status = _supervisor.get_status()
task_status = "unknown"
if _sup_task:
if _sup_task.cancelled():
task_status = "cancelled"
elif _sup_task.done():
try:
exception = _sup_task.exception()
if exception:
task_status = f"failed: {exception}"
else:
task_status = "completed"
except asyncio.CancelledError:
task_status = "cancelled"
else:
task_status = "running"
# return the health and number of active call sessions
return JSONResponse({
"status": "ok",
"supervisor_task_status": task_status,
"config": {
"ari_base": EnvConfig.ARI_BASE,
"ari_app": EnvConfig.ARI_APP,
"external_media_host": EnvConfig.EXTERNAL_MEDIA_HOST,
"ecs_media_wss_url": EnvConfig.ECS_MEDIA_WSS_URL,
},
**status
})
@app.post("/reconnect")
async def force_reconnect() -> JSONResponse:
"""
Force a reconnection to ARI (for debugging).
Returns:
JSONResponse: A JSON response containing the status and message.
"""
if not _supervisor:
logger.error("Supervisor not initialized")
raise HTTPException(
status_code=500,
detail="Supervisor not initialized"
)
logger.info("Supervisor reconnection triggered")
await _supervisor.force_reconnect()
logger.info("Supervisor reconnection completed")
return JSONResponse({
"status": "ok",
"message": "Reconnection triggered"
})
@app.post("/calls/{call_id}/hangup")
async def hangup_call(call_id: str) -> JSONResponse:
"""
Hang up a call by deleting the PSTN channel via ARI.
Args:
call_id: The ID of the call to hang up.
Returns:
JSONResponse: A JSON response containing the status and message.
"""
if not _supervisor or not _supervisor._http:
logger.error("Supervisor not initialized")
raise HTTPException(
status_code=500,
detail="Supervisor not initialized"
)
try:
async with _supervisor._http.delete(
url=f"{EnvConfig.ARI_BASE}/channels/{call_id}"
) as r:
if r.status == 404:
logger.error("Channel not found")
raise HTTPException(
status_code=404,
detail="Channel not found"
)
r.raise_for_status()
logger.info("Hung up call %s via ARI", call_id)
return JSONResponse({
"status": "ok",
"message": f"Call {call_id} hung up"
})
except HTTPException:
raise
except Exception as e:
logger.error("Error hanging up call %s: %s", call_id, e)
raise HTTPException(
status_code=500,
detail=f"Error hanging up call: {e}"
)
@app.post("/calls/originate")
async def originate_call(
endpoint: str,
context: str = "default",
extension: str = "s",
priority: int = 1,
timeout: int = 30,
) -> JSONResponse:
"""
Originate a new call via ARI (optional endpoint).
Args:
endpoint: The endpoint to originate the call to.
context: The context to originate the call to.
extension: The extension to originate the call to.
priority: The priority to originate the call to.
timeout: The timeout to originate the call to.
Returns:
JSONResponse: A JSON response containing the status, call_id, and message.
"""
if not _supervisor or not _supervisor._http:
logger.error("Supervisor not initialized")
raise HTTPException(
status_code=500,
detail="Supervisor not initialized"
)
try:
params = {
"endpoint": endpoint,
"context": context,
"extension": extension,
"priority": priority,
"timeout": timeout,
"app": EnvConfig.ARI_APP,
}
async with _supervisor._http.post(
url=f"{EnvConfig.ARI_BASE}/channels",
params=params
) as r:
r.raise_for_status()
channel = await r.json()
call_id = channel["id"]
logger.info("Originated call %s to %s", call_id, endpoint)
return JSONResponse({
"status": "ok",
"call_id": call_id,
"message": f"Call originated to {endpoint}"
})
except Exception as e:
logger.error("Error originating call to %s: %s", endpoint, e)
raise HTTPException(
status_code=500,
detail=f"Error originating call: {e}"
)