Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,14 @@ def __init__(
keyterms_prompt: NotGivenOr[list[str]] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
buffer_size_seconds: float = 0.05,
endpoint_url: str = "wss://streaming.assemblyai.com/v3/ws",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want this to be a base_url instead? do you expect the /v3/ws parameter to be modifiable.

):
"""
Args:
endpoint_url: The AssemblyAI streaming WebSocket endpoint URL. Use the EU endpoint
(wss://streaming.eu.assemblyai.com/v3/ws) for streaming in the EU. Defaults to
wss://streaming.assemblyai.com/v3/ws.
"""
super().__init__(
capabilities=stt.STTCapabilities(
streaming=True,
Expand All @@ -86,6 +93,7 @@ def __init__(
offline_recognize=False,
),
)
self._endpoint_url = endpoint_url
assemblyai_api_key = api_key if is_given(api_key) else os.environ.get("ASSEMBLYAI_API_KEY")
if not assemblyai_api_key:
raise ValueError(
Expand Down Expand Up @@ -145,6 +153,7 @@ def stream(
opts=config,
api_key=self._api_key,
http_session=self.session,
endpoint_url=self._endpoint_url,
)
self._streams.add(stream)
return stream
Expand Down Expand Up @@ -189,12 +198,14 @@ def __init__(
conn_options: APIConnectOptions,
api_key: str,
http_session: aiohttp.ClientSession,
endpoint_url: str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this to the conn_options?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not quite sure what you mean.
The conn_options is a shared object not in the AssemblyAI module. So i can't just add an other property here: https://github.com/livekit/agents/blob/main/livekit-agents/livekit/agents/types.py#L54

Could you suggest how/what to change?
Thank you!

) -> None:
super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate)

self._opts = opts
self._api_key = api_key
self._session = http_session
self._endpoint_url = endpoint_url
self._speech_duration: float = 0
self._last_preflight_start_time: float = 0
self._reconnect_event = asyncio.Event()
Expand Down Expand Up @@ -346,13 +357,12 @@ async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
"User-Agent": "AssemblyAI/1.0 (integration=Livekit)",
}

ws_url = "wss://streaming.assemblyai.com/v3/ws"
filtered_config = {
k: ("true" if v else "false") if isinstance(v, bool) else v
for k, v in live_config.items()
if v is not None
}
url = f"{ws_url}?{urlencode(filtered_config)}"
url = f"{self._endpoint_url}?{urlencode(filtered_config)}"
ws = await self._session.ws_connect(url, headers=headers)
Comment on lines 360 to 366
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Build the WS URL safely if endpoint_url already has query params.

f"{self._endpoint_url}?..." will produce an invalid URL if the caller includes existing query parameters (double ?). Safer to merge queries.

🛠️ Proposed fix
-from urllib.parse import urlencode
+from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
@@
-        url = f"{self._endpoint_url}?{urlencode(filtered_config)}"
+        split = urlsplit(self._endpoint_url)
+        query = dict(parse_qsl(split.query))
+        query.update(filtered_config)
+        url = urlunsplit((split.scheme, split.netloc, split.path, urlencode(query), split.fragment))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
filtered_config = {
k: ("true" if v else "false") if isinstance(v, bool) else v
for k, v in live_config.items()
if v is not None
}
url = f"{ws_url}?{urlencode(filtered_config)}"
url = f"{self._endpoint_url}?{urlencode(filtered_config)}"
ws = await self._session.ws_connect(url, headers=headers)
filtered_config = {
k: ("true" if v else "false") if isinstance(v, bool) else v
for k, v in live_config.items()
if v is not None
}
split = urlsplit(self._endpoint_url)
query = dict(parse_qsl(split.query))
query.update(filtered_config)
url = urlunsplit((split.scheme, split.netloc, split.path, urlencode(query), split.fragment))
ws = await self._session.ws_connect(url, headers=headers)
🤖 Prompt for AI Agents
In `@livekit-plugins/livekit-plugins-assemblyai/livekit/plugins/assemblyai/stt.py`
around lines 359 - 365, The URL building currently concatenates
self._endpoint_url and a new query string which breaks if self._endpoint_url
already contains query params; instead parse self._endpoint_url (using
urllib.parse.urlparse), merge its existing query params (urllib.parse.parse_qs)
with the filtered_config dict (ensuring boolean -> "true"/"false" conversion
already in filtered_config), flatten/encode the combined params with
urllib.parse.urlencode(doseq=True), reconstruct the full URL with
urllib.parse.urlunparse, and then call await self._session.ws_connect(url,
headers=headers); update the logic around filtered_config, self._endpoint_url,
urlencode, and the ws_connect call to use the merged-safe URL.

return ws

Expand Down