diff --git a/haiopy/__init__.py b/haiopy/__init__.py index 4688356..714e221 100644 --- a/haiopy/__init__.py +++ b/haiopy/__init__.py @@ -5,3 +5,14 @@ __author__ = """The pyfar developers""" __email__ = 'info@pyfar.org' __version__ = '0.1.0' + + +from . import ( + buffers, + devices, +) + +__all__ = [ + 'buffers', + 'devices', +] diff --git a/haiopy/devices.py b/haiopy/devices.py index fec93b9..0261a76 100644 --- a/haiopy/devices.py +++ b/haiopy/devices.py @@ -1,96 +1,819 @@ -import sounddevice as sd # noqa: F401 TODO: remove this after implementation +from multiprocessing import Event +import numpy as np +import sys +import sounddevice as sd +from abc import abstractmethod, ABCMeta +import platform +from haiopy.buffers import EmptyBuffer +from haiopy.buffers import _Buffer -def list_devices(): - pass +_valid_apis_windows = { + 'mme': 0, + 'windows directsound': 1, + 'directsound': 1, + 'asio': 2, + 'windows wasapi': 3, + 'wasapi': 3, + 'windows wdm-ks': 4, + 'wdm': 4, +} +_valid_apis_linux = [ + 'alsa', + 'oss', + 'pulse', + 'jack'] +_valid_apis_darwin = [ + 'coreaudio'] -class _Device(object): +_valid_apis = { + 'Windows': _valid_apis_windows, + 'Linux': _valid_apis_linux, + 'Darwin': _valid_apis_darwin, +} + +_default_apis = { + 'Windows': 2, + 'Linux': 1, + 'Darwin': 0, +} + + +def query_devices( + device: int | str = None, + kind: str | None = None, + host_api: str | None = None): + """Query the devices available on the system. + + Parameters + ---------- + device : int | str + The device to be queried. If None, all available devices are returned. + kind : str + The kind of device to be queried. Can be 'input' or 'output'. + host_api : str + The host API to be used. If None, the default API is used, which depends on the + operating system. For Windows, the default API is ASIO, for Linux it is ALSA, + and for macOS it is CoreAudio. + """ + + if device is None and host_api is None: + return sd.query_devices(device, kind) + + # if device is None: + if host_api is not None: + device_ids = sd.query_hostapis( + _valid_apis[platform.system()][host_api.lower()])['devices'] + else: + device_ids = sd.query_hostapis( + _default_apis[platform.system()])['devices'] + + device_list = [] + for dev_id in device_ids: + if device is None: + try: + device_list.append(sd.query_devices(dev_id, kind)) + except ValueError: + continue + elif device.lower() in sd.query_devices(dev_id)['name'].lower(): + device_list.append(sd.query_devices(dev_id, kind)) + + if len(device_list) > 1: + return sd.DeviceList(device_list) + + elif len(device_list) == 1: + if kind == 'output': + audio_device = OutputAudioDevice( + identifier=device_list[0]['index'], + sampling_rate=device_list[0]['default_samplerate'], + ) + else: + raise ValueError('Unsupported device type.') + + return audio_device + + +class _Device(metaclass=ABCMeta): def __init__( self, - id, + name, sampling_rate, block_size, dtype): super().__init__() + self._name = name + self._sampling_rate = sampling_rate + self._block_size = block_size + self._dtype = dtype @property def name(self): - raise NotImplementedError('Abstract method') - - def playback(): - pass + return self._name - def record(): - pass + @property + def sampling_rate(self): + """Sampling rate of the device.""" + return self._sampling_rate - def playback_record(): - pass + @sampling_rate.setter + @abstractmethod + def sampling_rate(self, sampling_rate): + """Set the sampling rate of the device.""" + raise NotImplementedError('Needs to be implemented in child class.') - def initialize_playback(): - pass + @property + def block_size(self): + """Block size used by the device.""" + return self._block_size - def initialize_record(): - pass + @block_size.setter + @abstractmethod + def block_size(self, block_size): + """Set the block size of the device.""" + raise NotImplementedError('Needs to be implemented in child class.') - def initialize_playback_record(): - pass + @property + def dtype(self): + """Data type of the devices audio buffer.""" + return self._dtype - def abort(): - pass + @dtype.setter + @abstractmethod + def dtype(self, dtype): + """Set the data type of the device.""" + raise NotImplementedError('Needs to be implemented in child class.') class AudioDevice(_Device): + """Abstract class implementing audio devices based on python-sounddevice. + """ + def __init__( self, - id, - sampling_rate, - block_size, - dtype, - latency=None, - extra_settings=None, - # finished_callback=None, - clip_off=None, - dither_off=None, - never_drop_input=None, - prime_output_buffers_using_stream_callback=None + identifier=0, + sampling_rate=44100, + block_size=512, + dtype='float32', ): - super().__init__(id, sampling_rate, block_size, dtype) + + identifier = sd.query_devices(identifier)['name'] + + super().__init__( + name=sd.query_devices(identifier)['name'], + sampling_rate=sampling_rate, + block_size=block_size, + dtype=dtype, + ) + self._identifier = identifier + + self._callback = None + self._stream = None + self._input_buffer = None + self._output_buffer = None + + self._stream_finished = Event() @property - def stream(): - pass + def identifier(self): + """The identifier of the device.""" + return self._identifier - @staticmethod - def callback(): - pass + @abstractmethod + def check_settings(): + """Check if settings are compatible with the physical device. + """ + raise NotImplementedError('Needs to be implemented in child class.') - def playback(data): - # fill queue, stream.start() - pass + @property + def name(self): + """The name of the device.""" + return self._name - def record(n_samples): - # stream start, read into the queue - pass + @property + def stream(self): + """The sounddevice audio stream. + """ + return self._stream - def playback_record(data): - # see combination above - pass + def _stream_active(self): + """Check if the stream is active.""" + return self.stream.active if self.stream is not None else False - def initialize_playback(channels): - # init queue, define callback, init stream + def finished_callback(self) -> None: + """Custom callback after a audio stream has finished. + Can be overwritten by users. + """ pass - def initialize_record(channels): - pass + def _finished_callback(self) -> None: + """Private portaudio callback after a audio stream has finished. - def initialize_playback_record(input_channels, output_channels): - pass + Ensures that the buffer is stopped. + """ + self._stream_finished.set() + self.finished_callback() + self.stream.stop() - def abort(): - # abort - pass + def start(self): + """Start the audio stream and consume the buffer.""" + if self.stream.closed: + print("Stream is closed. Try re-initializing.", file=sys.stderr) + return - def close(): - # remove stream - pass + elif not self.stream.active: + self._stream_finished.clear() + self.stream.start() + else: + print("Stream is already active.", file=sys.stderr) + + def wait(self): + """Wait for the audio stream to finish the buffer.""" + self._stream_finished.wait(timeout=None) + + def abort(self): + """Stop the audio steam without finishing remaining callbacks.""" + if self.stream.active is True: + self.stream.abort() + self._stop_buffer() + + def close(self): + """Close the audio device and release the sound card lock.""" + if self.stream is not None: + self.stream.close() + self._stop_buffer() + + def stop(self): + """Stop the audio stream after finishing all remaining callbacks.""" + if self.stream.active is True: + self.stream.stop() + self._stop_buffer() + + @abstractmethod + def _stop_buffer(self): + raise NotImplementedError() + + @abstractmethod + def _close_stream(self): + raise NotImplementedError() + + def __del__(self): + """Destructor for the AudioDevice. + Closes the sounddevice stream. + """ + if self._stream_active(): + self.stream.close() + + +class _ChannelMapping(metaclass=ABCMeta): + """Class to handle the channel mapping of the device. + + Parameters + ---------- + channels : list + The channels to be used by the device. + """ + + _valid_apis_windows = [ + 'asio', + 'windows directsound', 'directsound', + 'windows wdm-ks', 'wdm', + 'windows wasapi', 'wsapi'] + _valid_apis_linux = [ + 'alsa', + 'oss', + 'pulse', + 'jack'] + _valid_apis_darwin = [ + 'coreaudio'] + + _valid_apis = { + 'Windows': _valid_apis_windows, + 'Linux': _valid_apis_linux, + 'Darwin': _valid_apis_darwin, + } + + _default_apis = { + 'Windows': 'asio', + 'Linux': 'alsa', + 'Darwin': 'coreaudio', + } + + def __init__( + self, + channels: list[int], + n_channels_device: int, + api: str): + + if api.lower() not in self._valid_apis[platform.system()]: + raise ValueError( + f"Invalid driver {api}. For your platform supported drivers" + f" are: f{self._valid_apis[platform.system()]}") + + self._api = api.lower() + self._n_channels_device = n_channels_device + self.channels = channels + + @property + def channels(self) -> list[int]: + """The channels to be used by the device.""" + return self._channels + + @channels.setter + @abstractmethod + def channels(self, channels: list[int]): + """Set the channels to be used by the device.""" + raise NotImplementedError() + + @property + def n_channels_used(self) -> int: + """The number of channels containing data.""" + return len(self._channels) + + @property + def n_channels_device(self) -> int: + """The number of channels supported by the device.""" + return self._n_channels_device + + @n_channels_device.setter + def n_channels_device(self, n_channels_device: int): + """Set the number of channels supported by the device.""" + self._n_channels_device = n_channels_device + + @property + def n_channels_mapping(self) -> int: + """The number of output channels required for the stream. + + This includes a number of unused pre-pended channels which need to be + filled with zeros before writing the portaudio buffer. In case of + using only the first channel, portaudio plays back a mono signal, + which will be broadcast to the first two channels. To avoid this, + the minimum number of channels opened is always two, the unused second + channel is filled with zeros. + """ + if self.extra_settings is not None: + return self.n_channels_used + else: + return np.max((2, np.max(self._channels) + 1)) + + @property + def extra_settings(self) -> sd.AsioSettings | sd.CoreAudioSettings | None: + """The sounddevice extra settings for the device. + + These are specific to python-sounddevice and are used when opening the + portaudio stream. + """ + return self._extra_settings + + def __call__( + self, + data_buffer: np.ndarray[float]) -> np.ndarray[float]: + """Apply the mapping defined by the object to a data buffer. + + Parameters + ---------- + data_buffer : np.ndarray[float] + The input data buffer with shape (n_channels, block_size). + + Returns + ------- + np.ndarray[float] + The output data buffer with shape (block_size, n_channels_mapping). + """ + + if self._api in ['asio', 'coreaudio']: + # ASIO and CoreAudio handle the routing + return np.atleast_2d(data_buffer).T + + # Write a block to an array with all required output channels + # including zeros for unused channels. Required if the routing + # is not handled by ASIO or CoreAudio. Sounddevice alone does + # not support routing matrices + data = data_buffer + block_size = data_buffer.shape[-1] + + size_matches = block_size == self.n_channels_mapping + + if not self._stream_block_out or not size_matches: + self._stream_block_out = np.zeros( + (self.n_channels_mapping, block_size), + dtype=data.dtype) + self._stream_block_out[self.channels] = data + + return self._stream_block_out + + +class InputChannelMapping(_ChannelMapping): + """Class to handle the input channel mapping of an audio device. + + Examples + -------- + Create a mapping for a device with 8 channels and use only the second + channel. The input data is a 2D array with shape (1, 512). + After calling the mapping, the output data is a 2D array with shape + (512, 2), where the first channel is filled with zeros and the second + channel is filled with the input data. + + >>> import numpy as np + >>> from haiopy.devices import InputChannelMapping + >>> input_data = np.random.randn((1, 512), dtype='float32') + >>> device = InputChannelMapping( + ... channels=[2], + ... n_channels_device=8, + ... api='wasapi') + >>> device(input_data) + + """ + + def __init__( + self, + channels: list[int], + n_channels_device: int, + api: str): + super().__init__(channels, n_channels_device, api) + + @_ChannelMapping.channels.setter + def channels(self, channels: list[int]): + """Set the channels to be used by the device.""" + if np.any(np.asarray(channels) > self.n_channels_device): + raise ValueError( + f"Invalid channels {channels}. The device only supports " + f"{self.n_channels_device} channels.") + + if 'asio' in self._api: + extra_settings = sd.AsioSettings( + channel_selectors=channels) + + elif 'coreaudio' in self._api: + extra_settings = sd.CoreAudioSettings( + channel_map=channels) + else: + extra_settings = None + self._stream_block_out = None + + self._channels = channels + self._extra_settings = extra_settings + + +class OutputChannelMapping(_ChannelMapping): + """Class to handle the output channel mapping of an audio device. + + Examples + -------- + Create a mapping for a device with 8 channels and use only the second + channel. The input data is a 2D array with shape (1, 512). + After calling the mapping, the output data is a 2D array with shape + (512, 2), where the first channel is filled with zeros and the second + channel is filled with the input data. + + >>> import numpy as np + >>> from haiopy.devices import InputChannelMapping + >>> input_data = np.random.randn((1, 512), dtype='float32') + >>> device = InputChannelMapping( + ... channels=[2], + ... n_channels_device=8, + ... api='wasapi') + >>> device(input_data) + + """ + + def __init__( + self, + channels: list[int], + n_channels_device: int, + api: str): + super().__init__(channels, n_channels_device, api) + + @_ChannelMapping.channels.setter + def channels(self, channels: list[int]): + """Set the channels to be used by the device.""" + if np.any(np.asarray(channels) > self.n_channels_device): + raise ValueError( + f"Invalid channels {channels}. The device only supports " + f"{self.n_channels_device} channels.") + + if 'asio' in self._api: + extra_settings = sd.AsioSettings( + channel_selectors=channels) + + elif 'coreaudio' in self._api: + channel_map = np.ones( + self.n_channels_device, dtype=int) * -1 + channel_map[channels] = channels + + extra_settings = sd.CoreAudioSettings( + channel_map=channel_map) + else: + extra_settings = None + self._stream_block_out = None + + self._channels = channels + self._extra_settings = extra_settings + + +class OutputAudioDevice(AudioDevice): + """Class implementing an output audio device. + + The implementation is based on python-sounddevice and portaudio. + + """ + + def __init__( + self, + identifier=sd.default.device['output'], + sampling_rate=44100, + block_size=512, + channels=[0], + dtype='float32', + output_buffer=None, + ): + + # First check the settings before continuing + n_channels = len(channels) + sd.check_output_settings( + device=identifier, + channels=np.max([n_channels, np.max(channels)+1]), + dtype=dtype, + samplerate=sampling_rate) + + # Init base class + super().__init__( + identifier=identifier, + sampling_rate=sampling_rate, + block_size=block_size, + dtype=dtype) + + # Set the output channel mapping which is specific for each host api + self._output_channel_mapping = OutputChannelMapping( + channels, self.max_channels_output, self.host_api) + + # Set the output buffer which will be consumed in the callback + # function. If no buffer is given, an empty buffer is created. + self._output_buffer = None + if output_buffer is None: + output_buffer = EmptyBuffer( + block_size, n_channels, sampling_rate) + self.output_buffer = output_buffer + + # Initialize the device + self.initialize() + + @property + def name(self) -> str: + """The name of the device.""" + return sd.query_devices(self.identifier)['name'] + + @property + def host_api(self) -> str: + """The host API used by the device.""" + return sd.query_hostapis( + sd.query_devices(self.identifier)['hostapi'])['name'] + + def check_settings( + self, + n_channels: list[int] = None, + sampling_rate: int = None, + dtype: np.int8 | np.int16 | np.int32 | np.float32 = None, + extra_settings: sd.CoreAudioSettings | sd.AsioSettings | None = None): + """Check if settings are compatible with the physical device. + + Parameters + ---------- + n_channels : int + The number of channels to be used + sampling_rate : int + The audio sampling rate + dtype : np.float32, np.int8, np.int16, np.int32 + The audio buffer data type + extra_settings : extra settings + Audio API specific settings. + + Raises + ------ + PortAudioError + If the settings are incompatible with the device an exception is raised. + """ + sd.check_output_settings( + device=self.identifier, + channels=n_channels, + dtype=dtype, + extra_settings=extra_settings, + samplerate=sampling_rate) + + @property + def output_channel_mapping(self): + """The channel mapping of the device.""" + return self._output_channel_mapping + + @property + def output_channels(self) -> list[int]: + """The output channels of the device. + """ + return self.output_channel_mapping.channels + + @output_channels.setter + def output_channels(self, channels): + """Set the output channels of the device. + + Parameters + ---------- + channels : list + The output channels to be used by the device. + """ + + if self._stream_active() or self._buffer_active(): + raise ValueError("The device is currently in use and needs to be closed first") + self._close_stream() + self.output_channel_mapping.channels = channels + + @property + def n_channels_output(self): + """The total number of output channels. + + Returns + ------- + int + The number of output channels + """ + return self.output_channel_mapping.n_channels_used + + @property + def max_channels_output(self): + """The number of output channels supported by the device.""" + return sd.query_devices( + self.identifier, 'output')['max_output_channels'] + + def output_callback( + self, + outdata: np.ndarray[float], + frames: int, + timestamp, # noqa: ARG002 + status: sd.CallbackFlags) -> None: + """Portudio callback for output streams. + + Parameters + ---------- + outdata : array + Output buffer view + frames : int + Length of the buffer + timestamp : PaTimestamp + Timestamp of the callback event. This is a struct implemented in c and not directly + available. Only listed here since the number and order are required by portaudio. + status : sounddevice.CallbackFlags + Portaudio status flags + + Raises + ------ + sd.CallbackAbort + Abort the playback if a buffer underflow occurs. + sd.CallbackStop + Stop the playback if the output queue is empty. + """ + assert frames == self.block_size + if status.output_underflow: + print('Output underflow: increase blocksize?', file=sys.stderr) + raise sd.CallbackAbort('Buffer underflow') + assert not status + + try: + outdata[:] = self.output_channel_mapping(next(self.output_buffer)).T + except StopIteration as e: + raise sd.CallbackStop("Buffer empty") from e + + def initialize(self) -> None: + """Initialize and open the playback stream. + This will set the device to active, potentially blocking the soundcard for other + applications. Playback is not yet started. + """ + + ostream = sd.OutputStream( + samplerate=self.sampling_rate, + blocksize=self.block_size, + device=self.identifier, + channels=self.output_channel_mapping.n_channels_mapping, + dtype=self._dtype, + callback=self.output_callback, + finished_callback=self._finished_callback, + extra_settings=self.output_channel_mapping.extra_settings, + ) + self._stream = ostream + + def initialize_buffer(self) -> None: + """Initialize the output buffer. + Starts the buffer and waits for it to be active. + """ + self.output_buffer._start() + self.output_buffer._is_active.wait() + + @property + def output_buffer(self) -> type[_Buffer] | None: + """The output buffer which is consumed by the device. + """ + return self._output_buffer + + @output_buffer.setter + def output_buffer(self, buffer: type[_Buffer]): + """Set the output buffer which is consumed by the device. + The number of channels and the block size need to match the device settings. + """ + if self._stream_active() or self._buffer_active(): + raise ValueError( + "The device is currently in use and needs to be closed first") + + if buffer.block_size != self.block_size: + raise ValueError( + "The buffer's block size does not match. ", + f"Needs to be {self.block_size}") + + if buffer.n_channels != self.n_channels_output: + raise ValueError( + "The buffer's channel number does not match the channel " + f"mapping. A number of {self.n_channels_output} are currently " + f"used. These are {self.output_channels}") + + self._output_buffer = buffer + + def _buffer_active(self) -> bool: + """Check if the output buffer is active.""" + return False if self._output_buffer is None else self.output_buffer.is_active + + @_Device.block_size.setter + def block_size(self, block_size: int): + """Sets the block size of the device and the output buffer. + Any open stream needs to be closed an re-opened. The output buffer is reset. + """ + if self._stream_active() or self._buffer_active(): + raise ValueError( + "The device is currently in use and needs to be closed first") + self._close_stream() + self._block_size = block_size + self.output_buffer.block_size = block_size + self.initialize() + + @_Device.sampling_rate.setter + def sampling_rate(self, sampling_rate: int): + """Sets the sampling rate of the device. + Any open stream needs to be closed an re-opened. + """ + if self._stream_active() or self._buffer_active(): + raise ValueError( + "The device is currently in use and needs to be closed first") + self.check_settings(sampling_rate=sampling_rate) + self._close_stream() + self._sampling_rate = sampling_rate + self.output_buffer.sampling_rate = sampling_rate + self.initialize() + + @_Device.dtype.setter + def dtype(self, dtype): + """Sets the dtype of the device buffer (portaudio specific). + Any open stream needs to be closed an re-opened. + """ + if self._stream_active() or self._buffer_active(): + raise ValueError( + "The device is currently in use and needs to be closed first") + self._close_stream() + self.check_settings(dtype=dtype) + self._dtype = dtype + self.initialize() + + def _stop_buffer(self): + """Stop the output buffer iteration. + This will raise a StopIteration exception in the buffer. + """ + self._output_buffer._stop() + + def _close_stream(self): + """Close the steam and stop the output buffer. + This will release the soundcard lock. + """ + if self.stream is not None: + self.stream.close() + self._output_buffer._stop(msg=None) + + def start(self): + """Start playback.""" + if self._stream is None: + self.initialize() + self.output_buffer._start() + self.output_buffer._is_active.wait() + super().start() + + def wait(self): + """Wait for the device to finish playback.""" + super().wait() + self.output_buffer._is_finished.wait() + + def __del__(self): + """Destructor for the OutputDevice. + Closes the stream and stops the output buffer. + """ + # Close the stream if it is still active + super().__del__() + + # Stop the buffer if it is still active + if self._buffer_active(): + self._stop_buffer( + 'Output device has been closed. ', + 'Stopping buffer iteration.') diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..89214a2 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,53 @@ +import pytest +from .utils import signal_buffer_stub +from haiopy.buffers import SignalBuffer +import pyfar as pf +import numpy as np + + +@pytest.fixture +def empty_buffer_stub(): + """Create a stub representing an empty ArrayBuffer. + + Returns + ------- + ArrayBuffer + Stub of ArrayBuffer + """ + + block_size = 512 + n_blocks = 10 + data = np.zeros((1, n_blocks*block_size), dtype='float32') + + buffer = signal_buffer_stub(block_size, data) + duration = block_size*n_blocks/buffer.sampling_rate + + return buffer, duration + + +@pytest.fixture +def sine_buffer_stub(): + """Create a stub representing an empty ArrayBuffer. + + Returns + ------- + buffer: SignalBuffer + Stub of SignalBuffer + duration: float + Duration of the buffer in seconds. Required if waiting for the buffer + to finish is required. + + """ + sampling_rate = 44100 + block_size = 512 + n_blocks = 86 + data = np.zeros((1, n_blocks*block_size), dtype='float32') + t = np.arange(0, block_size*n_blocks)/sampling_rate + freq = 440 + data = np.sin(2*np.pi*t*freq)*10**(-6/20) + + data = np.atleast_2d(data).astype('float32') + buffer = SignalBuffer(block_size, pf.Signal(data, sampling_rate)) + duration = block_size*n_blocks/sampling_rate + + return buffer, duration diff --git a/tests/sounddevice_mocks.py b/tests/sounddevice_mocks.py new file mode 100644 index 0000000..4aaef64 --- /dev/null +++ b/tests/sounddevice_mocks.py @@ -0,0 +1,23 @@ +import pytest +from unittest.mock import MagicMock + +import numpy as np +import sounddevice as sd + + +def output_stream_mock(block_size=512, sampling_rate=44100, channels=1): + # pass + stream = MagicMock(spec_set=sd.OutputStream) + stream.samplerate = sampling_rate + stream.blocksize = block_size + stream.device = 0 + stream.channels = channels + stream.dtype = np.float32 + stream.latency = 0.1 + # stream.extra_settings = None + # stream.clip_off = False + # stream.dither_off = False + # stream.never_drop_input = False + # stream.prime_output_buffers_using_stream_callback = False + + return stream diff --git a/tests/test_devices.py b/tests/test_devices.py new file mode 100644 index 0000000..d060beb --- /dev/null +++ b/tests/test_devices.py @@ -0,0 +1,141 @@ +from haiopy import devices +from . import utils +from . import sounddevice_mocks as sdm +from unittest.mock import patch +import sounddevice as sd +import pytest +import numpy as np +from numpy import testing as npt + + +@patch('sounddevice.query_devices', new=utils.query_devices) +def test_audio_device(): + devices.AudioDevice(0) + + +@patch('sounddevice.query_devices', new=utils.query_devices) +@patch('sounddevice.check_output_settings', new=utils.check_output_settings) +@patch('sounddevice.OutputStream', new=sdm.output_stream_mock()) +def test_check_output_settings(empty_buffer_stub): + out_device = devices.OutputAudioDevice( + output_buffer=empty_buffer_stub[0]) + out_device.check_settings() + + +@patch('sounddevice.query_devices', new=utils.query_devices) +@patch('sounddevice.check_output_settings', new=utils.check_output_settings) +@patch('sounddevice.OutputStream', new=sdm.output_stream_mock()) +def test_check_init(empty_buffer_stub): + buffer = empty_buffer_stub[0] + out_device = devices.OutputAudioDevice( + output_buffer=empty_buffer_stub[0]) + out_device.check_settings() + + out_device.output_buffer = buffer + out_device._output_buffer == buffer + out_device.output_buffer == buffer + + # set a buffer with non matching block size + buffer.block_size = 256 + with pytest.raises(ValueError, match='block size does not match'): + out_device.output_buffer = buffer + + # change the block size of the buffer and check if buffers block size is + # set accordingly + new_block_size = 256 + out_device.block_size = new_block_size + out_device._block_size == new_block_size + out_device.output_buffer.block_size == new_block_size + + # set and get sampling rate + out_device.sampling_rate = 44100 + out_device._sampling_rate == 44100 + + # test if setters are blocked when the stream is in use + out_device.stream.active = True + with pytest.raises(ValueError, match='currently in use'): + out_device.block_size = 512 + + # test if setters are blocked when the buffer is in use + out_device.stream.active = False + out_device.output_buffer._start() + with pytest.raises(ValueError, match='currently in use'): + out_device.block_size = 512 + + +@patch('sounddevice.query_devices', new=utils.query_devices) +@patch('sounddevice.check_output_settings', new=utils.check_output_settings) +@patch('sounddevice.OutputStream', new=sdm.output_stream_mock()) +def test_sine_playback(sine_buffer_stub): + buffer = sine_buffer_stub[0] + + config = {'default_samplerate': 44100} + sampling_rate = config['default_samplerate'] + + channels = [1] + + out_device = devices.OutputAudioDevice( + identifier=0, + output_buffer=buffer, + channels=channels, + sampling_rate=sampling_rate) + out_device.check_settings() + + out_device.start() + assert out_device.output_buffer.is_active is True + + # manually call the callback function with an arbitrary outdata array + outdata = np.zeros((512, 2), dtype=np.float32) + # unset all callback flags + status = sd.CallbackFlags() + + # call callback, this would happen in a separate thread controlled by + # portaudio. Once the buffer is empty, the callback should raise an + # sd.CallbackStop exception. + with pytest.raises(sd.CallbackStop, match='Buffer empty'): + bdx = 0 + while True: + out_device.output_callback(outdata, 512, None, status) + npt.assert_allclose( + outdata[:, 1], + buffer._strided_data[0, bdx, :]) + bdx += 1 + + out_device._finished_callback() + assert out_device.output_buffer.is_active is False + + +@patch('sounddevice.query_devices', new=utils.query_devices) +@patch('sounddevice.check_output_settings', new=utils.check_output_settings) +@patch('sounddevice.OutputStream', new=sdm.output_stream_mock()) +def test_callback_errors(sine_buffer_stub): + buffer = sine_buffer_stub[0] + + config = {'default_samplerate': 44100} + sampling_rate = config['default_samplerate'] + + channels = [1] + + out_device = devices.OutputAudioDevice( + identifier=0, + output_buffer=buffer, + channels=channels, + sampling_rate=sampling_rate) + out_device.check_settings() + + out_device.start() + assert out_device.output_buffer.is_active is True + + # manually call the callback function with an arbitrary outdata array + outdata = np.zeros((512, 2), dtype=np.float32) + # unset all callback flags + status = sd.CallbackFlags() + + # No error + out_device.output_callback(outdata, 512, None, status) + + # Buffer underflow + with pytest.raises(sd.CallbackAbort, match='Buffer underflow'): + status = sd.CallbackFlags() + status.output_underflow = True + out_device.output_callback(outdata, 512, None, status) diff --git a/tests/test_devices_physical.py b/tests/test_devices_physical.py new file mode 100644 index 0000000..91ee19b --- /dev/null +++ b/tests/test_devices_physical.py @@ -0,0 +1,188 @@ +# activate ASIO support +import os +os.environ["SD_ENABLE_ASIO"] = "1" +from haiopy import devices +import sounddevice as sd +import pytest + + +def default_device_multiface_fireface(kind='both'): + device_list = sd.query_devices() + found = False + + import platform + + if platform.system() == 'Windows': + valid_devices = [ + 'ASIO Fireface USB', + ] + elif platform.system() == 'Darwin': + valid_devices = [ + 'Multiface', + 'Fireface', + 'Scarlett 2i4', + 'MADIface', + ] + + for valid_device in valid_devices: + for identifier, device in enumerate(device_list): + if valid_device in device['name']: + found = True + break + if not found: + raise ValueError( + "Please connect Fireface or Multiface, or specify test device.") + + sampling_rate = int(device['default_samplerate']) + name = device['name'] + print(f"\n\n Using: {name} with sampling rate = {sampling_rate}\n\n") + + return identifier, device + + +@pytest.mark.skipif(os.environ.get('CI') == 'true', + reason="CI does not have a soundcard") +def test_default_device_helper(): + identifier, device = default_device_multiface_fireface() + device_names = [ + 'Fireface', + 'Multiface', + 'Scarlett 2i4', + 'MADIface', + 'Focusrite USB ASIO', + 'Steinberg USB ASIO', + 'ReaRoute ASIO', + ] + assert any( + name in sd.query_devices(identifier)['name'] for name in device_names) + + fireface = 'Fireface' in sd.query_devices(identifier)['name'] + multiface = 'Multiface' in sd.query_devices(identifier)['name'] + scarlett = 'Scarlett 2i4' in sd.query_devices(identifier)['name'] + madiface = 'MADIface' in sd.query_devices(identifier)['name'] + focusrite = 'Focusrite USB ASIO' in sd.query_devices(identifier)['name'] + steinberg = 'Steinberg USB ASIO' in sd.query_devices(identifier)['name'] + rearoute = 'ReaRoute' in sd.query_devices(identifier)['name'] + +# ----------------------------------------------------------------------------- +# Output Device Tests +# ----------------------------------------------------------------------------- + + +@pytest.mark.skipif(os.environ.get('CI') == 'true', + reason="CI does not have a soundcard") +def test_check_output_settings(empty_buffer_stub): + identifier, config = default_device_multiface_fireface() + channels = [0] + block_size = 512 + + buffer = empty_buffer_stub[0] + sampling_rate = config['default_samplerate'] + + out_device = devices.OutputAudioDevice(identifier=identifier, + sampling_rate=sampling_rate, + block_size=block_size, + channels=channels, + dtype='float32', + output_buffer=buffer) + + # Check sampling rate + out_device.check_settings(sampling_rate=sampling_rate) + with pytest.raises(sd.PortAudioError, match="Invalid"): + out_device.check_settings(sampling_rate=10) + + # Check the dtype, apparently this raises a ValueError if invalid + out_device.check_settings(dtype='float32') + with pytest.raises(ValueError, match="Invalid"): + out_device.check_settings(dtype=float) + + # Check number of channels + out_device.check_settings(n_channels=config['max_output_channels']) + with pytest.raises(sd.PortAudioError, match="Invalid"): + out_device.check_settings(config['max_output_channels']+10) + + # Close Output Stream for next Tests + with pytest.raises(StopIteration, match="iteration stopped"): + out_device.close() + + +@pytest.mark.skipif(os.environ.get('CI') == 'true', + reason="CI does not have a soundcard") +def test_sine_playback(sine_buffer_stub): + + buffer = sine_buffer_stub[0] + identifier, config = default_device_multiface_fireface() + + sampling_rate = int(config['default_samplerate']) + + out_device = devices.OutputAudioDevice( + identifier=identifier, + output_buffer=buffer, + channels=[1], + block_size=buffer.block_size, + sampling_rate=sampling_rate) + out_device.check_settings() + + out_device.start() + assert out_device.output_buffer.is_active is True + out_device.wait() + assert out_device.output_buffer.is_active is False + + # Close Output Stream for next Tests + with pytest.raises(StopIteration, match="iteration stopped"): + out_device.close() + + +@pytest.mark.skipif(os.environ.get('CI') == 'true', + reason="CI does not have a soundcard") +def test_check_init(empty_buffer_stub, sine_buffer_stub): + buffer = sine_buffer_stub[0] + identifier, config = default_device_multiface_fireface() + + sampling_rate = int(config['default_samplerate']) + + out_device = devices.OutputAudioDevice( + identifier=identifier, + output_buffer=empty_buffer_stub[0], + channels=[1], + sampling_rate=sampling_rate) + out_device.check_settings() + assert out_device.output_buffer == empty_buffer_stub[0] + out_device.output_buffer = buffer + assert out_device._output_buffer == buffer + assert out_device.output_buffer == buffer + # test playback with new buffer + out_device.start() + out_device.wait() + + # set new block_size + new_block_size = 1024 + out_device.block_size = new_block_size + assert out_device._block_size == new_block_size + assert out_device.output_buffer.block_size == new_block_size + # test playback with new block_size + out_device.start() + out_device.wait() + + # set new sampling_rate + new_sampling_rate = 88200 if sampling_rate == 44100 else 44100 + out_device.sampling_rate = new_sampling_rate + assert out_device._sampling_rate == new_sampling_rate + assert out_device.sampling_rate == new_sampling_rate + assert out_device.output_buffer.sampling_rate == new_sampling_rate + # test playback with new sampling_rate + out_device.start() + out_device.wait() + + # set new channels + # new_channels = [0, 1] + # out_device.channels = new_channels + # out_device._output_buffer.reset_index() + # assert out_device.channels == new_channels + # # test playback with new channels + # out_device.start() + # out_device.wait() + + # Close Output Stream for next Tests + with pytest.raises(StopIteration, match="iteration stopped"): + out_device.close() diff --git a/tests/test_mappings.py b/tests/test_mappings.py new file mode 100644 index 0000000..ef2aac9 --- /dev/null +++ b/tests/test_mappings.py @@ -0,0 +1,53 @@ +from unittest.mock import patch +import numpy as np +import pytest + +from haiopy.devices import ( + OutputChannelMapping, InputChannelMapping) + + +@pytest.mark.parametrize( + 'valid_apis', [ + 'windows wasapi', + 'windows directsound', + 'asio', + 'windows wdm-ks']) +@patch('platform.system', new=lambda: 'Windows') +@patch('sounddevice.AsioSettings', new=lambda channel_selectors: None) +def test_init_checks_windows(valid_apis): + OutputChannelMapping([0], 1, valid_apis) + InputChannelMapping([0], 1, valid_apis) + + +@pytest.mark.parametrize( + 'valid_apis', [ + 'alsa', + 'oss', + 'pulse', + 'jack']) +@patch('platform.system', new=lambda: 'Linux') +def test_init_checks_linux(valid_apis): + OutputChannelMapping([0], 1, valid_apis) + InputChannelMapping([0], 1, valid_apis) + + +@pytest.mark.parametrize( + 'valid_apis', ['coreaudio']) +@patch('platform.system', new=lambda: 'Darwin') +@patch('sounddevice.CoreAudioSettings', new=lambda channel_map: None) +def test_init_checks_macos(valid_apis): + OutputChannelMapping([0], 1, valid_apis) + InputChannelMapping([0], 1, valid_apis) + + +@patch('platform.system', new=lambda: 'Linux') +def test_manual_mapping(): + output_mapping = OutputChannelMapping([2, 3], 4, 'alsa') + + data_block = np.ones((2, 512), dtype=float) + truth = np.vstack(( + np.zeros_like(data_block), + data_block, + )) + np.testing.assert_array_equal( + output_mapping(data_block), truth) diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..1695fdf --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,91 @@ +import pytest +from unittest import mock +import numpy as np +import pyfar as pf +from haiopy.buffers import SignalBuffer + + +def default_devices(): + return [0, 0] + + +def query_devices(id=None, kind=None): + if kind == 'input': + return { + 'name': "MockDevice", + 'index': 0, + 'hostapi': 'CoreAudio', + 'max_input_channels': 8, + 'default_low_input_latency': 0.1, + 'default_high_input_latency': 0.15, + 'default_samplerate': 44100 + } + elif kind == 'output': + return { + 'name': "MockInputDevice", + 'index': 0, + 'hostapi': 'CoreAudio', + 'max_output_channels': 8, + 'default_low_output_latency': 0.1, + 'default_high_output_latency': 0.15, + 'default_samplerate': 44100 + } + else: + return { + 'name': "MockOutput", + 'index': 0, + 'hostapi': 'CoreAudio', + 'max_input_channels': 8, + 'max_output_channels': 8, + 'default_low_input_latency': 0.1, + 'default_low_output_latency': 0.1, + 'default_high_input_latency': 0.15, + 'default_high_output_latency': 0.15, + 'default_samplerate': 44100 + } + + +def supported_mock_device_parameters(): + return { + 'samplerate': [44.1e3, 48e3, 2*44.1e3, 96e3, 192e3], + 'dtype': ['float32'], + 'channels': [8]} + + +def check_output_settings( + device=None, + channels=None, + dtype=None, + extra_settings=None, + samplerate=None): + """So far this only passes for all settings""" + pass + + +def check_input_settings( + device=None, + channels=None, + dtype=None, + extra_settings=None, + samplerate=None): + """So far this only passes for all settings""" + pass + + +def signal_buffer_stub(block_size=512, data=np.zeros((1, 512))): + """Generate a ArrayBuffer Stub with given block size and data + + Parameters + ---------- + block_size : int + Block size for the sound card callback + data : array_like, float32, int24, int16, int8 + The data of the buffer + """ + if np.mod(data.shape[-1], block_size) != 0: + raise ValueError( + 'The data needs to be an integer multiple of the block size') + + sig = pf.Signal(data, 44100, fft_norm='rms') + + return SignalBuffer(block_size, sig)