-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathaudio_stream.py
More file actions
228 lines (178 loc) · 7.91 KB
/
audio_stream.py
File metadata and controls
228 lines (178 loc) · 7.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""
Audio Stream Example -- Full DIY Pipeline (Plivo Audio Streaming Protocol)
Config: no stt/llm/tts (raw audio relay, you handle everything)
Plivo is just the telephony bridge. You get raw audio frames and handle
everything yourself: STT, LLM, TTS, VAD, turn detection.
This is the escape hatch for customers running:
- Speech-to-speech models (e.g. OpenAI Realtime API)
- Custom voice AI pipelines on their own infra
- Non-standard audio processing (music, sound effects, etc.)
Features demonstrated:
- VoiceApp server pattern (Plivo connects to you)
- Full Plivo Audio Streaming protocol compatibility
- Sync handlers with per-session state (session.data)
- Audio echo bot (buffers audio, plays it back)
- Checkpoint events for playback tracking
- clearAudio for interruption
Protocol (Plivo Audio Streaming):
Inbound events (server -> you):
- start: Stream metadata (callId, streamId, mediaFormat, etc.)
- media: Audio chunk (base64 payload, ~20ms per chunk)
- dtmf: DTMF digit detected
- playedStream: Checkpoint reached (audio before this point finished playing)
- clearedAudio: Audio queue was cleared
- stop: Stream ended
Outbound events (you -> server):
- playAudio: Send audio to the caller (base64 payload)
- checkpoint: Mark a playback position (triggers playedStream when reached)
- clearAudio: Clear all queued audio (for interruption)
Platform session events (also received):
- session.started: Session metadata (agent_session_id, call_id)
- session.ended: Session ended (duration_seconds)
Usage:
1. pip install plivo_agentstack[all]
2. Set PLIVO_AUTH_ID, PLIVO_AUTH_TOKEN env vars
3. python audio_stream.py
"""
import asyncio
import os
import time
from plivo_agentstack import AsyncClient
from plivo_agentstack.agent import (
AgentSessionEnded,
AgentSessionStarted,
ClearedAudio,
Error,
PlayedStream,
StreamDtmf,
StreamMedia,
StreamStart,
StreamStop,
VoiceApp,
)
PLIVO_AUTH_ID = os.environ.get("PLIVO_AUTH_ID", "")
PLIVO_AUTH_TOKEN = os.environ.get("PLIVO_AUTH_TOKEN", "")
BASE_URL = os.environ.get("PLIVO_API_URL", "https://api.plivo.com")
CALLBACK_HOST = os.environ.get("CALLBACK_HOST", "http://localhost:9001")
PLIVO_NUMBER = os.environ.get("PLIVO_NUMBER", "")
plivo = AsyncClient(PLIVO_AUTH_ID, PLIVO_AUTH_TOKEN, base_url=BASE_URL)
async def init_agent():
agent = await plivo.agent.agents.create(
agent_name="Audio Echo Bot",
audio_format="mulaw_8k",
audio_channels=1,
websocket_url="ws://localhost:9000/ws",
# Plivo Audio Streaming XML parameters
# All fields map to Plivo Stream XML attributes (snake_case here,
# converted to camelCase in the XML by the platform).
stream={
"extra_headers": {"userId": "12345", "tenant": "acme"},
# custom key-value pairs -> Plivo extraHeaders
# keys/values must be alphanumeric, max 512 bytes
# agentUuid is always added automatically
# "stream_timeout": 86400, # max stream duration in seconds (default: 86400)
# "content_type": "audio/x-mulaw;rate=8000", # audio codec
# "noise_cancellation": False, # Plivo-side NC (default: false)
# "noise_cancellation_level": 85, # NC intensity 60-100 (if NC enabled)
},
callbacks={
"hangup": {"url": f"{CALLBACK_HOST}/callbacks/hangup", "method": "POST"},
"recording": {"url": f"{CALLBACK_HOST}/callbacks/recording", "method": "POST"},
"ring": {"url": f"{CALLBACK_HOST}/callbacks/ring", "method": "POST"},
# "stream_status": {"url": ..., "method": "POST"},
},
)
agent_uuid = agent["agent_uuid"]
print(f"Agent created: {agent_uuid}")
# Assign a phone number to this agent (for inbound call routing)
if PLIVO_NUMBER:
await plivo.agent.numbers.assign(agent_uuid, PLIVO_NUMBER)
print(f"Number {PLIVO_NUMBER} assigned to agent")
numbers = await plivo.agent.numbers.list(agent_uuid)
print(f"Agent numbers: {numbers['numbers']}")
print(f"Call {PLIVO_NUMBER} to reach the echo bot")
# --- Event handlers ---
app = VoiceApp()
@app.on("session.started")
def on_session_started(session, event: AgentSessionStarted):
"""Platform session started -- receive session metadata."""
print(f"Session started: {session.agent_session_id}")
@app.on("start")
def on_start(session, event: StreamStart):
"""Plivo stream started -- receive audio stream metadata.
StreamStart flattens the nested Plivo protocol into direct attributes:
event.stream_id, event.call_id, event.content_type, event.sample_rate
"""
session.data["echo_buffer"] = []
session.data["echo_playing"] = False
session.data["encoding"] = event.content_type or "audio/x-mulaw"
session.data["sample_rate"] = event.sample_rate or 8000
print(
f"Stream started: streamId={session.stream_id} "
f"callId={event.call_id} "
f"format={session.data['encoding']} "
f"rate={session.data['sample_rate']}"
)
@app.on("media")
def on_media(session, event: StreamMedia):
"""Plivo audio chunk received (~20ms of audio).
StreamMedia flattens the nested Plivo payload:
event.payload -- base64-encoded audio
event.content_type, event.sample_rate, event.timestamp
"""
# Buffer incoming audio chunks
session.data["echo_buffer"].append(event.payload)
# After collecting enough chunks (~2 seconds), play them back
if len(session.data["echo_buffer"]) >= 100 and not session.data["echo_playing"]:
# Buffer full -- play back the echo
session.data["echo_playing"] = True
print(f" Playing echo: {len(session.data['echo_buffer'])} chunks")
for chunk_b64 in session.data["echo_buffer"]:
session.send_media(
chunk_b64,
content_type=session.data.get("encoding", "audio/x-mulaw"),
sample_rate=session.data.get("sample_rate", 8000),
)
time.sleep(0.020) # 20ms pacing -- sync handler, runs in thread
# Place a checkpoint so we know when echo playback is done
session.send_checkpoint("echo-complete")
session.data["echo_buffer"].clear()
session.data["echo_playing"] = False
@app.on("dtmf")
def on_dtmf(session, event: StreamDtmf):
"""DTMF digit detected.
In audio stream mode, DTMF arrives as a Plivo Audio Streaming event
({"event": "dtmf"}). In managed pipeline mode, use @app.on("user.dtmf").
parse_event handles the Plivo nesting automatically.
"""
print(f" DTMF: {event.digit}")
if event.digit == "*":
# Clear all queued audio (tests clearAudio -> clearedAudio roundtrip)
print(" Clearing audio queue...")
session.clear_audio()
elif event.digit == "#":
session.hangup()
@app.on("playedStream")
def on_played_stream(session, event: PlayedStream):
"""Checkpoint reached -- audio before this point finished playing.
event.name matches what was passed to send_checkpoint().
"""
print(f" Checkpoint reached: {event.name}")
@app.on("clearedAudio")
def on_cleared_audio(session, event: ClearedAudio):
"""Audio queue was cleared (after a clearAudio command)."""
print(" Audio cleared")
@app.on("session.error")
def on_error(session, event: Error):
print(f" Error [{event.code}]: {event.message}")
@app.on("stop")
def on_stop(session, event: StreamStop):
"""Plivo stream stopped -- call ended or stream closed."""
print("Stream stopped")
@app.on("session.ended")
def on_ended(session, event: AgentSessionEnded):
"""Platform session ended -- final event with duration."""
print(f"Session ended: {event.duration_seconds}s")
if __name__ == "__main__":
asyncio.run(init_agent())
app.run(port=9000)