-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontinuous_audio_stream.py
More file actions
140 lines (114 loc) · 4.6 KB
/
continuous_audio_stream.py
File metadata and controls
140 lines (114 loc) · 4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
Continuous Audio Stream - Single microphone stream shared by multiple processors
Solves device conflict by opening the audio device ONCE and distributing audio to subscribers
"""
import logging
import threading
import queue
import numpy as np
import sounddevice as sd
from typing import Optional, Callable, List
logger = logging.getLogger(__name__)
class ContinuousAudioStream:
"""
Continuous audio stream that reads from microphone and distributes to multiple subscribers.
Opens device ONCE and keeps it open, avoiding device conflicts.
"""
def __init__(self,
sample_rate: int = 48000,
channels: int = 1,
audio_device: Optional[int] = None,
chunk_size: int = 1024):
"""
Initialize continuous audio stream
Args:
sample_rate: Audio sample rate
channels: Number of audio channels
audio_device: Audio input device index
chunk_size: Size of audio chunks to read
"""
self.sample_rate = sample_rate
self.channels = channels
self.audio_device = audio_device
self.chunk_size = chunk_size
# Audio stream
self.stream = None
self.running = False
# Subscribers (callback functions that receive audio chunks)
self.subscribers = []
self.subscriber_lock = threading.Lock()
logger.info(f"ContinuousAudioStream created:")
logger.info(f" Sample rate: {sample_rate}Hz")
logger.info(f" Channels: {channels}")
logger.info(f" Device: {audio_device}")
logger.info(f" Chunk size: {chunk_size}")
def add_subscriber(self, callback: Callable[[np.ndarray, int], None], name: str = "unnamed"):
"""
Add a subscriber that will receive audio chunks
Args:
callback: Function(audio_chunk, sample_rate) to receive audio
name: Name for logging
"""
with self.subscriber_lock:
self.subscribers.append((callback, name))
logger.info(f"📡 Added audio subscriber: {name}")
def remove_subscriber(self, callback: Callable[[np.ndarray, int], None]):
"""Remove a subscriber"""
with self.subscriber_lock:
self.subscribers = [(cb, name) for cb, name in self.subscribers if cb != callback]
def start(self):
"""Start the continuous audio stream"""
if self.running:
logger.warning("Audio stream already running")
return
try:
logger.info("🎤 Starting continuous audio stream...")
# Audio callback - called by sounddevice when audio is available
def audio_callback(indata, frames, time_info, status):
if status:
logger.debug(f"Audio callback status: {status}")
# Copy audio data
audio_chunk = indata.copy()
# Distribute to all subscribers
with self.subscriber_lock:
for callback, name in self.subscribers:
try:
callback(audio_chunk, self.sample_rate)
except Exception as e:
logger.error(f"Error in subscriber '{name}': {e}")
# Open audio stream
self.stream = sd.InputStream(
device=self.audio_device,
samplerate=self.sample_rate,
channels=self.channels,
blocksize=self.chunk_size,
callback=audio_callback,
dtype=np.int16
)
# Start stream
self.stream.start()
self.running = True
logger.info("✅ Continuous audio stream started")
logger.info(f" Device: {self.audio_device}")
logger.info(f" Sample rate: {self.sample_rate}Hz")
except Exception as e:
logger.error(f"Failed to start audio stream: {e}", exc_info=True)
self.running = False
raise
def stop(self):
"""Stop the continuous audio stream"""
if not self.running:
return
logger.info("🛑 Stopping continuous audio stream...")
try:
if self.stream:
self.stream.stop()
self.stream.close()
self.stream = None
self.running = False
logger.info("✅ Continuous audio stream stopped")
except Exception as e:
logger.error(f"Error stopping audio stream: {e}")
def is_running(self):
"""Check if stream is running"""
return self.running