Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ This requires the `vector` extension (pgvector) in your Postgres instance.
2. Make sure your MQTT broker is running
3. Run the application: `python main.py`
4. Configure Asterisk dialplan to direct calls to the Stasis application named "satellite"
5. Send an MQTT message to the topic `transcription/control` with payload `{"action":"start", "uniqueid":"[CALL_UNIQUEID]"}` or `{"action":"start", "linkedid":"[CALL_UNIQUEID]"}`
6. Stop the trascription with MQTT message to the topic `transcription/control` with payload `{"action":"stop", "uniqueid":"[CALL_UNIQUEID]"}` or `{"action":"stop", "linkedid":"[CALL_UNIQUEID]"}`

### REST API

Expand Down
248 changes: 199 additions & 49 deletions asterisk_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,96 @@ def __init__(self, url, app, username, password, mqtt_client, rtp_server):
self.session = None
self.is_shutting_down = False
self.max_reconnect_delay = 30 # Maximum seconds between reconnection attempts
self.pending_transcription_requests = set()

def _build_connector(self, channel_id):
"""Create a new Deepgram connector instance for a channel."""
channel = self.channels[channel_id]
return DeepgramConnector(
deepgram_api_key=os.getenv("DEEPGRAM_API_KEY"),
rtp_stream_in=channel['rtp_stream_in'],
rtp_stream_out=channel['rtp_stream_out'],
mqtt_client=self.mqtt_client,
uniqueid=channel.get('linkedid') or channel_id,
language=channel['language'],
speaker_name_in=channel['speaker_name_in'],
speaker_number_in=channel['speaker_number_in'],
speaker_name_out=channel['speaker_name_out'],
speaker_number_out=channel['speaker_number_out'],
call_elapsed_at_start=channel.get('call_elapsed_at_start'),
call_start_epoch=channel.get('call_start_epoch')
)

def _find_channels_for_callid(self, call_id):
"""Resolve a call identifier (linkedid or uniqueid) to active channel IDs."""
if call_id in self.channels:
return [call_id]
return [cid for cid, cdata in self.channels.items() if cdata.get('linkedid') == call_id]

def _extract_call_start_epoch(self, linkedid):
"""
Extract call start epoch seconds from linkedid (e.g. 1771864831.1430).
Returns None when format is not parseable.
"""
if not isinstance(linkedid, str) or linkedid == "":
return None
try:
epoch_part = linkedid.split(".", 1)[0]
epoch_value = int(epoch_part)
if epoch_value > 0:
return float(epoch_value)
except Exception:
return None
return None

async def _get_answered_elapsed_seconds(self, channel_id):
"""
Return elapsed seconds since call answer for this channel, if available.
Falls back to None when Asterisk does not expose the variable.
"""
variable_candidates = [
"CHANNEL(answeredtime)",
"ANSWEREDTIME",
]
for variable in variable_candidates:
value = await self._get_channel_variable(channel_id, variable)
if value is None:
continue
try:
elapsed = float(value)
if elapsed >= 0:
return elapsed
except (TypeError, ValueError):
continue
return None

async def _get_channel_variable(self, channel_id, variable):
"""
Read an ARI channel variable, returning None when it does not exist.
Missing variables are expected in some call phases and should not be noisy.
"""
url = f"{self.url}/ari/channels/{channel_id}/variable"
async with self.session.request(
"GET",
url,
params={"variable": variable},
) as response:
if response.status == 404:
logger.debug(
f"ARI variable not found for channel {channel_id}: {variable}"
)
return None

if response.status >= 400:
error_text = await response.text()
logger.error(
f"ARI variable request failed ({response.status}) "
f"for channel {channel_id}, variable {variable}: {error_text}"
)
return None

data = await response.json()
return data.get("value")

async def connect(self):
"""Connect to Asterisk ARI and setup WebSocket for events"""
Expand Down Expand Up @@ -143,6 +233,19 @@ async def _handle_stasis_start(self, event):
self.channels[channel_id]['caller_number'] = channel['caller'].get('number', 'unknown')
self.channels[channel_id]['connected_name'] = channel['connected'].get('name', 'connected')
self.channels[channel_id]['connected_number'] = channel['connected'].get('number', 'unknown')
linkedid = channel.get('linkedid')
if not linkedid:
linkedid = await self._get_channel_variable(channel_id, "CHANNEL(linkedid)")
self.channels[channel_id]['linkedid'] = linkedid or channel_id
self.channels[channel_id]['call_start_epoch'] = self._extract_call_start_epoch(
self.channels[channel_id]['linkedid']
)
self.channels[channel_id]['transcription_requested'] = (
channel_id in self.pending_transcription_requests
or self.channels[channel_id]['linkedid'] in self.pending_transcription_requests
)
self.channels[channel_id]['connector_started'] = False
self.pending_transcription_requests.discard(channel_id)
logger.debug(f"Channel {channel_id} entered Satellite. Details: {channel}")
# Create a snoop channel for in and one for out
for direction in ['in', 'out']:
Expand All @@ -161,22 +264,14 @@ async def _handle_stasis_start(self, event):
logger.debug(f"Snoop channel {snoop_channel_id} created")
try:
# Get connected info using ARI
channel_info = await self._ari_request(
'GET',
f"/channels/{channel_id}/variable",
params={'variable': 'CALLERIDNUMINTERNAL'}
)
if channel_info and 'value' in channel_info and channel_info['value'] != '':
self.channels[channel_id]['connected_number'] = channel_info['value']
logger.debug(f"Updated connected number for channel {channel_id}: {channel_info['value']}")
channel_info = await self._ari_request(
'GET',
f"/channels/{channel_id}/variable",
params={'variable': 'CALLERIDNAMEINTERNAL'}
)
if channel_info and 'value' in channel_info and channel_info['value'] != '':
self.channels[channel_id]['connected_name'] = channel_info['value']
logger.debug(f"Updated connected name for channel {channel_id}: {channel_info['value']}")
connected_number = await self._get_channel_variable(channel_id, "CALLERIDNUMINTERNAL")
if connected_number:
self.channels[channel_id]['connected_number'] = connected_number
logger.debug(f"Updated connected number for channel {channel_id}: {connected_number}")
connected_name = await self._get_channel_variable(channel_id, "CALLERIDNAMEINTERNAL")
if connected_name:
self.channels[channel_id]['connected_name'] = connected_name
logger.debug(f"Updated connected name for channel {channel_id}: {connected_name}")
except Exception as e:
logger.debug(f"connected info not updated for channel {channel_id}: {e}")
if channel_id.startswith("snoop-"):
Expand Down Expand Up @@ -261,21 +356,16 @@ async def _handle_stasis_start(self, event):
speaker_name_in, speaker_name_out = speaker_name_out, speaker_name_in
speaker_number_in, speaker_number_out = speaker_number_out, speaker_number_in

# create a deepgram connector instance
self.channels[original_channel_id]['connector'] = DeepgramConnector(
deepgram_api_key=os.getenv("DEEPGRAM_API_KEY"),
rtp_stream_in=rtp_stream_in,
rtp_stream_out=rtp_stream_out,
mqtt_client=self.mqtt_client,
uniqueid=original_channel_id,
language=self.channels[original_channel_id]['language'],
speaker_name_in=speaker_name_in,
speaker_number_in=speaker_number_in,
speaker_name_out=speaker_name_out,
speaker_number_out=speaker_number_out
)
# start the deepgram connector in background to avoid blocking event loop
asyncio.create_task(self._start_connector(original_channel_id))
self.channels[original_channel_id]['rtp_stream_in'] = rtp_stream_in
self.channels[original_channel_id]['rtp_stream_out'] = rtp_stream_out
self.channels[original_channel_id]['speaker_name_in'] = speaker_name_in
self.channels[original_channel_id]['speaker_number_in'] = speaker_number_in
self.channels[original_channel_id]['speaker_name_out'] = speaker_name_out
self.channels[original_channel_id]['speaker_number_out'] = speaker_number_out

# Start the connector only if a realtime transcription was requested.
if self.channels[original_channel_id].get('transcription_requested'):
asyncio.create_task(self._start_connector(original_channel_id))
except Exception as e:
logger.error(f"Failed to start connector for channel {original_channel_id}: {e}")
await self.close_channel(original_channel_id)
Expand All @@ -290,15 +380,71 @@ async def _handle_stasis_start(self, event):
async def _start_connector(self, channel_id):
"""Start the Deepgram connector in background"""
try:
if channel_id in self.channels and 'connector' in self.channels[channel_id]:
await self.channels[channel_id]['connector'].start()
logger.info(f"Deepgram connector started for channel {channel_id}")
if channel_id not in self.channels:
return

channel = self.channels[channel_id]
if channel.get('connector_started'):
return

if 'rtp_stream_in' not in channel or 'rtp_stream_out' not in channel:
logger.info(f"Transcription requested for {channel_id} but RTP streams are not ready yet")
return

if channel.get('call_elapsed_at_start') is None:
channel['call_elapsed_at_start'] = await self._get_answered_elapsed_seconds(channel_id)

if 'connector' not in channel:
channel['connector'] = self._build_connector(channel_id)

await channel['connector'].start()
channel['connector_started'] = True
logger.info(f"Deepgram connector started for channel {channel_id}")
except Exception as e:
logger.error(f"Failed to start Deepgram connector for channel {channel_id}: {e}")
# Close the channel if connector fails to start
if channel_id in self.channels:
await self.close_channel(channel_id)

async def start_transcription(self, call_id):
"""Enable realtime transcription for a specific active call."""
self.pending_transcription_requests.add(call_id)
channel_ids = self._find_channels_for_callid(call_id)
if not channel_ids:
logger.info(f"Queued transcription start for call {call_id}")
return

for channel_id in channel_ids:
self.channels[channel_id]['transcription_requested'] = True
answered_elapsed = await self._get_answered_elapsed_seconds(channel_id)
if answered_elapsed is not None:
self.channels[channel_id]['call_elapsed_at_start'] = answered_elapsed
else:
self.channels[channel_id]['call_elapsed_at_start'] = None
if not self.channels[channel_id].get('connector_started'):
asyncio.create_task(self._start_connector(channel_id))

async def stop_transcription(self, call_id):
"""Disable realtime transcription for a specific active call."""
self.pending_transcription_requests.discard(call_id)
channel_ids = self._find_channels_for_callid(call_id)
if not channel_ids:
logger.info(f"Stop transcription ignored: call {call_id} not found")
return

for channel_id in channel_ids:
channel = self.channels.get(channel_id)
if channel is None:
continue
channel['transcription_requested'] = False
connector = channel.pop('connector', None)
if connector is not None:
try:
await connector.close()
except Exception as e:
logger.debug(f"Failed to close connector for channel {channel_id}: {e}")
channel['connector_started'] = False

async def _handle_stasis_end(self, event):
"""Handle channel hangup event"""
channel = event['channel']
Expand Down Expand Up @@ -330,42 +476,46 @@ async def _handle_channel_left_bridge(self, event):
async def close_channel(self, channel_id):
"""Close a channel"""
logger.debug(f"close_channel(channel_id={channel_id})")
if channel_id in self.channels:
channel = self.channels.get(channel_id)
if channel is not None:
# Close the deepgram connector
if 'connector' in self.channels[channel_id]:
connector = channel.pop('connector', None)
if connector is not None:
try:
await self.channels[channel_id]['connector'].close()
await connector.close()
except Exception as e:
logger.debug(f"Failed to close connector for channel {channel_id}: {e}")
del self.channels[channel_id][f'connector']
for direction in ['in', 'out']:
# Remove the bridge
if f'bridge_{direction}' in self.channels[channel_id]:
if f'bridge_{direction}' in channel:
try:
await self._ari_request(
'DELETE',
f"/bridges/{self.channels[channel_id][f'bridge_{direction}']}"
f"/bridges/{channel[f'bridge_{direction}']}"
)
except Exception as e:
logger.debug(f"Failed to delete bridge {self.channels[channel_id][f'bridge_{direction}']}: {e}")
del self.channels[channel_id][f'bridge_{direction}']
logger.debug(f"Failed to delete bridge {channel[f'bridge_{direction}']}: {e}")
del channel[f'bridge_{direction}']
for direction in ['in', 'out']:
# Remove the external media channel
if f'external_media_channel_{direction}' in self.channels[channel_id]:
if f'external_media_channel_{direction}' in channel:
try:
await self._ari_request(
'DELETE',
f"/channels/{self.channels[channel_id][f'external_media_channel_{direction}']}"
f"/channels/{channel[f'external_media_channel_{direction}']}"
)
except Exception as e:
logger.debug(f"Failed to delete external media channel {self.channels[channel_id][f'external_media_channel_{direction}']}: {e}")
del self.channels[channel_id][f'external_media_channel_{direction}']
logger.debug(f"Failed to delete external media channel {channel[f'external_media_channel_{direction}']}: {e}")
del channel[f'external_media_channel_{direction}']
for direction in ['in', 'out']:
# Remove the RTP stream
if f'rtp_source_port_{direction}' in self.channels[channel_id]:
self.rtp_server.end_stream(self.channels[channel_id][f'rtp_source_port_{direction}'])
del self.channels[channel_id][f'rtp_source_port_{direction}']
if f'rtp_source_port_{direction}' in channel:
self.rtp_server.end_stream(channel[f'rtp_source_port_{direction}'])
del channel[f'rtp_source_port_{direction}']
if f'rtp_stream_{direction}' in channel:
del channel[f'rtp_stream_{direction}']
del self.channels[channel_id]
self.pending_transcription_requests.discard(channel_id)

async def _handle_channel_hangup(self, event):
"""Handle channel hangup event"""
Expand Down
Loading
Loading