Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 203 additions & 1 deletion haiopy/buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from threading import Event
from scipy import signal
import warnings

import matplotlib.pyplot as plt

class _Buffer(ABC):
"""Abstract base class for audio buffers for block-wise iteration.
Expand Down Expand Up @@ -538,3 +538,205 @@ def next(self):

def _reset(self):
super()._reset()

class LinearSweepGenerator(_Buffer):
"""
Generator to block wise calculate a linear sweep as described in [#]_.

Examples:
--------

>>> from haiopy.buffers import LinearSweepGenerator
>>> import matplotlib.pyplot as plt
>>> sine = LinearSweepGenerator(440, 128)
>>> blocks = [next(LinearSweepGenerator), next(LinearSweepGenerator), next(LinearSweepGenerator)]
>>> for block in blocks:
>>> plt.plot((block))
>>> plt.show()

Source:
.. [#] Farina, Angelo (2000): "Simultaneous measurement of impulse
response and distortion with a swept-sine technique." 108th AES
Convention, Paris: France.
"""

def __init__(self,
block_size,
amplitude=1,
sweep_duration = 10,
f_1 = 0,
f_2 = 22050,
sampling_rate=44100) -> None:
"""
Initialize a `LinearSweepGenerator`with a given block_size,
amplitude, sweep duration, lower frequency, upper frequency
and samplingrate.

Parameters
----------
block_size : int
The block size in samples.
amplitude: double, optional
The amplitude of the sine. The default is ``1``.
sweep_duration : float, optional
The duration of one sweep in seconds. The default is ``10`` s
f_1 : int, optional
The starting frequency for the sweep in Hz. The default is
``0`` Hz.
f_2: int, optional
The ending frequency for the sweep in Hz. The default is
``22050`` Hz.
sampling_rate : int, optional
The sampling rate in Hz. The default is ``44100``.
"""
super().__init__(block_size)
self._amplitude = amplitude
self._sampling_rate = sampling_rate
self._T = sweep_duration
self._f_1 = f_1

if f_2 > sampling_rate/2:
self.f_2 = np.floor(sampling_rate/2)
else:
self._f_2 = f_2

self._t_start = 0
self._in_transition = False

@property
def amplitude(self):
"""Return the amplitude of the sweep."""
return self._amplitude

@amplitude.setter
def amplitude(self, amplitude):
self.check_if_active()
self._amplitude = amplitude
self._reset()

@property
def sampling_rate(self):
"""Return the sampling rate of the generated sweep."""
return self._sampling_rate

@sampling_rate.setter
def sampling_rate(self, sampling_rate):
"""Set the sampling rate."""
self.check_if_active()
self._sampling_rate = sampling_rate
self._phase = 0

@property
def n_channels(self):
"""Return the number of channels. This is currently always 1."""
return 1

@property
def sweep_duration(self):
"""Return the duration for one sweep."""
return self._T

@sweep_duration.setter
def sweep_duration(self, sweep_duration):
"""Set the duration of one sweep."""
self.check_if_active()
self._sweep_duration = sweep_duration
self._T = sweep_duration

@property
def T(self):
"""Return the period time T for one sweep."""
return self._T

@property
def f_1(self):
"""Return lower freuqency limit for sweep."""
return self._f_1

@f_1.setter
def f_1(self, f_1):
"""Set the lower frequency of the sweep."""
self.check_if_active()
self._f_1 = f_1
self._reset()

@property
def f_2(self):
"""Return upper freuqency limit for sweep."""
return self._f_2

@f_2.setter
def f_2(self, f_2):
"""Set the upper frequency of the sweep."""
self.check_if_active()
self._f_2 = f_2
self._reset()

@property
def t_start(self):
"""Return current time."""
return self._t_start

def _set_block_size(self, block_size):
"""Set blocksize."""
self.check_if_active()
super()._set_block_size(block_size)
self._reset()

def next(self):
"""
Return the next audio block as numpy array and increase the current
time variable by one block.
If sweep ends inside a block, it fades out and a new sweep begins.
"""

# if in transition to a new sweep-start (i.e. sweep ends within
# a signal block, not directly at the end)
if self._in_transition:
time_till_end = self._T - self._t_start
n_samples_1 = int(time_till_end * self._sampling_rate)
t_1 = np.arange(n_samples_1) + self._t_start
self._reset()
n_samples_2 = self._block_size - n_samples_1
t_2 = np.arange(n_samples_2) / self._sampling_rate
t = np.concatenate((t_1, t_2))
else: # usual iteration
n_samples = int(self._block_size)
t = np.arange(n_samples) / self._sampling_rate + self._t_start

# [1, page 5]
w_1 = 2 * np.pi * self._f_1
w_2 = 2 * np.pi * self._f_2
data = (self._amplitude *
np.sin(w_1 * t + (w_2-w_1) / self._T * t**2 / 2))

self._t_start += self._block_size / self._sampling_rate

# window end of first sweep to reach zero at the end:
# ...and set t_start new:
if self._in_transition:
win_len = 11
win = np.hanning(win_len)
win_half = win[-int(win_len/2):]
idx = n_samples_1-len(win_half)
data[idx:n_samples_1] = data[idx:n_samples_1]*win_half

# set start time based on t_2:
# in next iterations t_start can be increased by block/fs again as
# usual
self._t_start = t_2[-1] + 1/self._sampling_rate
self._in_transition = False

# if we reach desired duration of one sweep, do....:
if (self._t_start >=
(self._T - (self._block_size / self._sampling_rate))):
if self._t_start == self._T:
self._reset()
else:
self._in_transition = True

return data

def _reset(self):
"""Reset sweep."""
self._t_start = 0
55 changes: 53 additions & 2 deletions tests/test_buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
SignalBuffer,
EmptyBuffer
)
from haiopy.buffers import SineGenerator, NoiseGenerator
from haiopy.buffers import SineGenerator, NoiseGenerator, LinearSweepGenerator
import pytest
import pyfar as pf
from scipy import signal
Expand Down Expand Up @@ -260,7 +260,6 @@ def test_SineGenerator():
# check if sine generator is active now
assert sine.is_active is True


def test_SineGenerator_updates():
frequency = 440
block_size = 512
Expand Down Expand Up @@ -434,6 +433,58 @@ def test_NoiseGenerator_updates():
with pytest.raises(BufferError, match="needs to be inactive"):
noise.seed = 123

def test_LinearSweepGenerator():
block_size = 512
amplitude = 0.1
sampling_rate = 44100
f1 = 0
f2 = 20000
sweep_duration = 2

sweep = LinearSweepGenerator(block_size,
amplitude,
sweep_duration = sweep_duration,
f_1 = f1,
f_2 = f2,
sampling_rate=sampling_rate)

# test getters with default
assert sweep.block_size == block_size
assert sweep.sampling_rate == 44100
assert sweep.amplitude == amplitude
assert sweep.f_1 == f1
assert sweep.f_2 == f2
assert sweep.sweep_duration == sweep_duration

assert sweep.n_channels == 1

# check if sweep generator is not active yet
assert sweep.is_active is False

# check first block
t_start = 0
n_samples = int(block_size)
t = np.arange(n_samples) / sampling_rate + t_start
w_1 = 2 * np.pi * f1
w_2 = 2 * np.pi * f2
sweep_data = (amplitude *
np.sin(w_1 * t + (w_2-w_1) / sweep_duration * t**2 / 2))
block_data = next(sweep)
npt.assert_array_equal(block_data, sweep_data)

# check if noise generator is active now
assert sweep.is_active is True

# check second block
t_start += block_size / sampling_rate
t = np.arange(n_samples) / sampling_rate + t_start
w_1 = 2 * np.pi * f1
w_2 = 2 * np.pi * f2
sweep_data = (amplitude *
np.sin(w_1 * t + (w_2-w_1) / sweep_duration * t**2 / 2))

block_data = next(sweep)
npt.assert_array_equal(block_data, sweep_data)

def test_sampling_rate_setter():
# Test setting the sampling rate, resampling the Signal and updating data
Expand Down
Loading