From b1a8d1fc8de7a23b28446de736a008b4c152a385 Mon Sep 17 00:00:00 2001 From: Tommaso Ascani Date: Thu, 26 Feb 2026 12:11:26 +0100 Subject: [PATCH 1/2] feat(transcription): implement start and stop control for realtime transcription via MQTT --- asterisk_bridge.py | 248 +++++++++++++++++++++++++++++++++--------- deepgram_connector.py | 36 +++++- main.py | 18 ++- 3 files changed, 248 insertions(+), 54 deletions(-) diff --git a/asterisk_bridge.py b/asterisk_bridge.py index 4e31dcb..52415e4 100644 --- a/asterisk_bridge.py +++ b/asterisk_bridge.py @@ -37,6 +37,96 @@ def __init__(self, url, app, username, password, mqtt_client, rtp_server): self.session = None self.is_shutting_down = False self.max_reconnect_delay = 30 # Maximum seconds between reconnection attempts + self.pending_transcription_requests = set() + + def _build_connector(self, channel_id): + """Create a new Deepgram connector instance for a channel.""" + channel = self.channels[channel_id] + return DeepgramConnector( + deepgram_api_key=os.getenv("DEEPGRAM_API_KEY"), + rtp_stream_in=channel['rtp_stream_in'], + rtp_stream_out=channel['rtp_stream_out'], + mqtt_client=self.mqtt_client, + uniqueid=channel.get('linkedid') or channel_id, + language=channel['language'], + speaker_name_in=channel['speaker_name_in'], + speaker_number_in=channel['speaker_number_in'], + speaker_name_out=channel['speaker_name_out'], + speaker_number_out=channel['speaker_number_out'], + call_elapsed_at_start=channel.get('call_elapsed_at_start'), + call_start_epoch=channel.get('call_start_epoch') + ) + + def _find_channels_for_callid(self, call_id): + """Resolve a call identifier (linkedid or uniqueid) to active channel IDs.""" + if call_id in self.channels: + return [call_id] + return [cid for cid, cdata in self.channels.items() if cdata.get('linkedid') == call_id] + + def _extract_call_start_epoch(self, linkedid): + """ + Extract call start epoch seconds from linkedid (e.g. 1771864831.1430). + Returns None when format is not parseable. + """ + if not isinstance(linkedid, str) or linkedid == "": + return None + try: + epoch_part = linkedid.split(".", 1)[0] + epoch_value = int(epoch_part) + if epoch_value > 0: + return float(epoch_value) + except Exception: + return None + return None + + async def _get_answered_elapsed_seconds(self, channel_id): + """ + Return elapsed seconds since call answer for this channel, if available. + Falls back to None when Asterisk does not expose the variable. + """ + variable_candidates = [ + "CHANNEL(answeredtime)", + "ANSWEREDTIME", + ] + for variable in variable_candidates: + value = await self._get_channel_variable(channel_id, variable) + if value is None: + continue + try: + elapsed = float(value) + if elapsed >= 0: + return elapsed + except (TypeError, ValueError): + continue + return None + + async def _get_channel_variable(self, channel_id, variable): + """ + Read an ARI channel variable, returning None when it does not exist. + Missing variables are expected in some call phases and should not be noisy. + """ + url = f"{self.url}/ari/channels/{channel_id}/variable" + async with self.session.request( + "GET", + url, + params={"variable": variable}, + ) as response: + if response.status == 404: + logger.debug( + f"ARI variable not found for channel {channel_id}: {variable}" + ) + return None + + if response.status >= 400: + error_text = await response.text() + logger.error( + f"ARI variable request failed ({response.status}) " + f"for channel {channel_id}, variable {variable}: {error_text}" + ) + return None + + data = await response.json() + return data.get("value") async def connect(self): """Connect to Asterisk ARI and setup WebSocket for events""" @@ -143,6 +233,19 @@ async def _handle_stasis_start(self, event): self.channels[channel_id]['caller_number'] = channel['caller'].get('number', 'unknown') self.channels[channel_id]['connected_name'] = channel['connected'].get('name', 'connected') self.channels[channel_id]['connected_number'] = channel['connected'].get('number', 'unknown') + linkedid = channel.get('linkedid') + if not linkedid: + linkedid = await self._get_channel_variable(channel_id, "CHANNEL(linkedid)") + self.channels[channel_id]['linkedid'] = linkedid or channel_id + self.channels[channel_id]['call_start_epoch'] = self._extract_call_start_epoch( + self.channels[channel_id]['linkedid'] + ) + self.channels[channel_id]['transcription_requested'] = ( + channel_id in self.pending_transcription_requests + or self.channels[channel_id]['linkedid'] in self.pending_transcription_requests + ) + self.channels[channel_id]['connector_started'] = False + self.pending_transcription_requests.discard(channel_id) logger.debug(f"Channel {channel_id} entered Satellite. Details: {channel}") # Create a snoop channel for in and one for out for direction in ['in', 'out']: @@ -161,22 +264,14 @@ async def _handle_stasis_start(self, event): logger.debug(f"Snoop channel {snoop_channel_id} created") try: # Get connected info using ARI - channel_info = await self._ari_request( - 'GET', - f"/channels/{channel_id}/variable", - params={'variable': 'CALLERIDNUMINTERNAL'} - ) - if channel_info and 'value' in channel_info and channel_info['value'] != '': - self.channels[channel_id]['connected_number'] = channel_info['value'] - logger.debug(f"Updated connected number for channel {channel_id}: {channel_info['value']}") - channel_info = await self._ari_request( - 'GET', - f"/channels/{channel_id}/variable", - params={'variable': 'CALLERIDNAMEINTERNAL'} - ) - if channel_info and 'value' in channel_info and channel_info['value'] != '': - self.channels[channel_id]['connected_name'] = channel_info['value'] - logger.debug(f"Updated connected name for channel {channel_id}: {channel_info['value']}") + connected_number = await self._get_channel_variable(channel_id, "CALLERIDNUMINTERNAL") + if connected_number: + self.channels[channel_id]['connected_number'] = connected_number + logger.debug(f"Updated connected number for channel {channel_id}: {connected_number}") + connected_name = await self._get_channel_variable(channel_id, "CALLERIDNAMEINTERNAL") + if connected_name: + self.channels[channel_id]['connected_name'] = connected_name + logger.debug(f"Updated connected name for channel {channel_id}: {connected_name}") except Exception as e: logger.debug(f"connected info not updated for channel {channel_id}: {e}") if channel_id.startswith("snoop-"): @@ -261,21 +356,16 @@ async def _handle_stasis_start(self, event): speaker_name_in, speaker_name_out = speaker_name_out, speaker_name_in speaker_number_in, speaker_number_out = speaker_number_out, speaker_number_in - # create a deepgram connector instance - self.channels[original_channel_id]['connector'] = DeepgramConnector( - deepgram_api_key=os.getenv("DEEPGRAM_API_KEY"), - rtp_stream_in=rtp_stream_in, - rtp_stream_out=rtp_stream_out, - mqtt_client=self.mqtt_client, - uniqueid=original_channel_id, - language=self.channels[original_channel_id]['language'], - speaker_name_in=speaker_name_in, - speaker_number_in=speaker_number_in, - speaker_name_out=speaker_name_out, - speaker_number_out=speaker_number_out - ) - # start the deepgram connector in background to avoid blocking event loop - asyncio.create_task(self._start_connector(original_channel_id)) + self.channels[original_channel_id]['rtp_stream_in'] = rtp_stream_in + self.channels[original_channel_id]['rtp_stream_out'] = rtp_stream_out + self.channels[original_channel_id]['speaker_name_in'] = speaker_name_in + self.channels[original_channel_id]['speaker_number_in'] = speaker_number_in + self.channels[original_channel_id]['speaker_name_out'] = speaker_name_out + self.channels[original_channel_id]['speaker_number_out'] = speaker_number_out + + # Start the connector only if a realtime transcription was requested. + if self.channels[original_channel_id].get('transcription_requested'): + asyncio.create_task(self._start_connector(original_channel_id)) except Exception as e: logger.error(f"Failed to start connector for channel {original_channel_id}: {e}") await self.close_channel(original_channel_id) @@ -290,15 +380,71 @@ async def _handle_stasis_start(self, event): async def _start_connector(self, channel_id): """Start the Deepgram connector in background""" try: - if channel_id in self.channels and 'connector' in self.channels[channel_id]: - await self.channels[channel_id]['connector'].start() - logger.info(f"Deepgram connector started for channel {channel_id}") + if channel_id not in self.channels: + return + + channel = self.channels[channel_id] + if channel.get('connector_started'): + return + + if 'rtp_stream_in' not in channel or 'rtp_stream_out' not in channel: + logger.info(f"Transcription requested for {channel_id} but RTP streams are not ready yet") + return + + if channel.get('call_elapsed_at_start') is None: + channel['call_elapsed_at_start'] = await self._get_answered_elapsed_seconds(channel_id) + + if 'connector' not in channel: + channel['connector'] = self._build_connector(channel_id) + + await channel['connector'].start() + channel['connector_started'] = True + logger.info(f"Deepgram connector started for channel {channel_id}") except Exception as e: logger.error(f"Failed to start Deepgram connector for channel {channel_id}: {e}") # Close the channel if connector fails to start if channel_id in self.channels: await self.close_channel(channel_id) + async def start_transcription(self, call_id): + """Enable realtime transcription for a specific active call.""" + self.pending_transcription_requests.add(call_id) + channel_ids = self._find_channels_for_callid(call_id) + if not channel_ids: + logger.info(f"Queued transcription start for call {call_id}") + return + + for channel_id in channel_ids: + self.channels[channel_id]['transcription_requested'] = True + answered_elapsed = await self._get_answered_elapsed_seconds(channel_id) + if answered_elapsed is not None: + self.channels[channel_id]['call_elapsed_at_start'] = answered_elapsed + else: + self.channels[channel_id]['call_elapsed_at_start'] = None + if not self.channels[channel_id].get('connector_started'): + asyncio.create_task(self._start_connector(channel_id)) + + async def stop_transcription(self, call_id): + """Disable realtime transcription for a specific active call.""" + self.pending_transcription_requests.discard(call_id) + channel_ids = self._find_channels_for_callid(call_id) + if not channel_ids: + logger.info(f"Stop transcription ignored: call {call_id} not found") + return + + for channel_id in channel_ids: + channel = self.channels.get(channel_id) + if channel is None: + continue + channel['transcription_requested'] = False + connector = channel.pop('connector', None) + if connector is not None: + try: + await connector.close() + except Exception as e: + logger.debug(f"Failed to close connector for channel {channel_id}: {e}") + channel['connector_started'] = False + async def _handle_stasis_end(self, event): """Handle channel hangup event""" channel = event['channel'] @@ -330,42 +476,46 @@ async def _handle_channel_left_bridge(self, event): async def close_channel(self, channel_id): """Close a channel""" logger.debug(f"close_channel(channel_id={channel_id})") - if channel_id in self.channels: + channel = self.channels.get(channel_id) + if channel is not None: # Close the deepgram connector - if 'connector' in self.channels[channel_id]: + connector = channel.pop('connector', None) + if connector is not None: try: - await self.channels[channel_id]['connector'].close() + await connector.close() except Exception as e: logger.debug(f"Failed to close connector for channel {channel_id}: {e}") - del self.channels[channel_id][f'connector'] for direction in ['in', 'out']: # Remove the bridge - if f'bridge_{direction}' in self.channels[channel_id]: + if f'bridge_{direction}' in channel: try: await self._ari_request( 'DELETE', - f"/bridges/{self.channels[channel_id][f'bridge_{direction}']}" + f"/bridges/{channel[f'bridge_{direction}']}" ) except Exception as e: - logger.debug(f"Failed to delete bridge {self.channels[channel_id][f'bridge_{direction}']}: {e}") - del self.channels[channel_id][f'bridge_{direction}'] + logger.debug(f"Failed to delete bridge {channel[f'bridge_{direction}']}: {e}") + del channel[f'bridge_{direction}'] for direction in ['in', 'out']: # Remove the external media channel - if f'external_media_channel_{direction}' in self.channels[channel_id]: + if f'external_media_channel_{direction}' in channel: try: await self._ari_request( 'DELETE', - f"/channels/{self.channels[channel_id][f'external_media_channel_{direction}']}" + f"/channels/{channel[f'external_media_channel_{direction}']}" ) except Exception as e: - logger.debug(f"Failed to delete external media channel {self.channels[channel_id][f'external_media_channel_{direction}']}: {e}") - del self.channels[channel_id][f'external_media_channel_{direction}'] + logger.debug(f"Failed to delete external media channel {channel[f'external_media_channel_{direction}']}: {e}") + del channel[f'external_media_channel_{direction}'] for direction in ['in', 'out']: # Remove the RTP stream - if f'rtp_source_port_{direction}' in self.channels[channel_id]: - self.rtp_server.end_stream(self.channels[channel_id][f'rtp_source_port_{direction}']) - del self.channels[channel_id][f'rtp_source_port_{direction}'] + if f'rtp_source_port_{direction}' in channel: + self.rtp_server.end_stream(channel[f'rtp_source_port_{direction}']) + del channel[f'rtp_source_port_{direction}'] + if f'rtp_stream_{direction}' in channel: + del channel[f'rtp_stream_{direction}'] del self.channels[channel_id] + self.pending_transcription_requests.discard(channel_id) async def _handle_channel_hangup(self, event): """Handle channel hangup event""" diff --git a/deepgram_connector.py b/deepgram_connector.py index 62bb0d3..844771b 100644 --- a/deepgram_connector.py +++ b/deepgram_connector.py @@ -5,6 +5,7 @@ import asyncio import json import logging +import time import numpy as np from deepgram import ( DeepgramClient, @@ -52,8 +53,22 @@ def __init__(self, deepgram_api_key, rtp_stream_in, rtp_stream_out, mqtt_client, self.complete_call = [] self._close_started = False self._close_lock = asyncio.Lock() + self._first_transcript_logged = False + self.call_elapsed_at_start_override = kwargs.get("call_elapsed_at_start") + self.call_start_epoch = kwargs.get("call_start_epoch") + self.call_elapsed_at_start = 0.0 + self.transcription_start_monotonic = None async def start(self): + if isinstance(self.call_elapsed_at_start_override, (int, float)) and self.call_elapsed_at_start_override >= 0: + self.call_elapsed_at_start = float(self.call_elapsed_at_start_override) + elif isinstance(self.call_start_epoch, (int, float)) and self.call_start_epoch > 0: + self.call_elapsed_at_start = max(0.0, time.time() - float(self.call_start_epoch)) + logger.info( + f"Transcription timing base for {self.uniqueid}: " + f"call_elapsed_at_start={self.call_elapsed_at_start:.3f}s" + ) + deepgram: DeepgramClient = DeepgramClient(self.deepgram_api_key) self.dg_connection = deepgram.listen.asyncwebsocket.v("1") self.dg_connection.on(LiveTranscriptionEvents.Transcript, self.on_message) @@ -79,6 +94,7 @@ async def start(self): logger.error(f"Failed to start Deepgram connection for {self.uniqueid}") return + self.transcription_start_monotonic = time.monotonic() self.connected = True self.read_audio_from_rtp_task = asyncio.create_task(self.read_audio_from_rtp()) self.send_audio_to_deepgram_task = asyncio.create_task(self.send_audio_to_deepgram()) @@ -91,7 +107,19 @@ async def on_message(self, client, result, **kwargs): transcription = result.channel.alternatives[0].transcript if len(transcription) == 0: return - timestamp = result.start + if not self._first_transcript_logged: + logger.info( + f"First Deepgram transcript for {self.uniqueid} " + f"(is_final={result.is_final}, len={len(transcription)})" + ) + self._first_transcript_logged = True + # Use wall-clock elapsed since transcription start to avoid drift + # caused by buffered audio being processed faster/slower than realtime. + if self.transcription_start_monotonic is None: + stream_elapsed = float(result.start) + else: + stream_elapsed = max(0.0, time.monotonic() - self.transcription_start_monotonic) + timestamp = stream_elapsed + float(self.call_elapsed_at_start) if result.channel_index[0] == 0: speaker_name = self.speaker_name_in speaker_number = self.speaker_number_in @@ -161,8 +189,9 @@ async def read_audio_from_rtp(self): Read audio from RTP stream """ try: - target_size = 5120 - timeout = 0.25 # 250ms timeout + # Keep chunks relatively small to reduce latency to first transcript. + target_size = 1600 + timeout = 0.10 while self.connected: # Read audio data from both streams till target size or timeout is reached buffer_in = bytearray() @@ -275,4 +304,3 @@ async def close(self): "raw_transcription": text }) ) - diff --git a/main.py b/main.py index 84c27ba..17a62c9 100644 --- a/main.py +++ b/main.py @@ -82,9 +82,26 @@ async def realtime_call_transcription(): rtp_server=rtp_server ) + async def handle_mqtt_control(topic, payload): + """Handle transcription control commands from middleware.""" + if not isinstance(payload, dict): + return + + action = payload.get("action") + call_id = payload.get("linkedid") or payload.get("uniqueid") + if not isinstance(action, str) or not isinstance(call_id, str) or not call_id: + return + + if action == "start": + await asterisk_bridge.start_transcription(call_id) + elif action == "stop": + await asterisk_bridge.stop_transcription(call_id) + # Start services logger.info("Starting services...") + mqtt_client.set_callback(handle_mqtt_control) await mqtt_client.connect() + await mqtt_client.subscribe("transcription/control") await asterisk_bridge.connect() logger.info("All services started") @@ -110,4 +127,3 @@ async def realtime_call_transcription(): server_thread.start() # Run the realtime call transcription pipeline asyncio.run(realtime_call_transcription()) - From d80eb15ffc6b27def9c68010ccebcc243947fef6 Mon Sep 17 00:00:00 2001 From: Stefano Date: Thu, 26 Feb 2026 15:49:42 +0100 Subject: [PATCH 2/2] Update README with MQTT transcription control instructions Added instructions for starting and stopping transcription via MQTT. --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 12dc9d9..d58468d 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,8 @@ This requires the `vector` extension (pgvector) in your Postgres instance. 2. Make sure your MQTT broker is running 3. Run the application: `python main.py` 4. Configure Asterisk dialplan to direct calls to the Stasis application named "satellite" +5. Send an MQTT message to the topic `transcription/control` with payload `{"action":"start", "uniqueid":"[CALL_UNIQUEID]"}` or `{"action":"start", "linkedid":"[CALL_UNIQUEID]"}` +6. Stop the trascription with MQTT message to the topic `transcription/control` with payload `{"action":"stop", "uniqueid":"[CALL_UNIQUEID]"}` or `{"action":"stop", "linkedid":"[CALL_UNIQUEID]"}` ### REST API