diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 47acdf5..0000000 Binary files a/.DS_Store and /dev/null differ diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..d0d6f98 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +"""Init file for tests.""" diff --git a/adc_eval/eval/calc.py b/adc_eval/eval/calc.py new file mode 100644 index 0000000..80d7e61 --- /dev/null +++ b/adc_eval/eval/calc.py @@ -0,0 +1,152 @@ +"""Spectral analysis helper module.""" + +import numpy as np + + +def db_to_pow(value, places=3): + """Convert dBW to W.""" + if isinstance(value, np.ndarray): + return np.round(10 ** (0.1 * value), places) + return round(10 ** (0.1 * value), places) + + +def dBW(value, places=1): + """Convert to dBW.""" + if isinstance(value, np.ndarray): + return np.round(10 * np.log10(value), places) + return round(10 * np.log10(value), places) + + +def enob(sndr, places=1): + """Return ENOB for given SNDR.""" + return round((sndr - 1.76) / 6.02, places) + + +def sndr_sfdr(spectrum, freq, fs, nfft, leak, full_scale=0): + """Get SNDR and SFDR.""" + + # Zero the DC bin + for i in range(0, leak + 1): + spectrum[i] = 0 + bin_sig = np.argmax(spectrum) + psig = sum(spectrum[i] for i in range(bin_sig - leak, bin_sig + leak + 1)) + spectrum_n = spectrum.copy() + spectrum_n[bin_sig] = 0 + fbin = fs / nfft + + for i in range(bin_sig - leak, bin_sig + leak + 1): + spectrum_n[i] = 0 + + bin_spur = np.argmax(spectrum_n) + pspur = spectrum[bin_spur] + + noise_power = sum(spectrum_n) + noise_floor = 2 * noise_power / nfft + + stats = {} + + stats["sig"] = { + "freq": freq[bin_sig], + "bin": bin_sig, + "power": psig, + "dB": dBW(psig), + "dBFS": round(dBW(psig) - full_scale, 1), + } + + stats["spur"] = { + "freq": freq[bin_spur], + "bin": bin_spur, + "power": pspur, + "dB": dBW(pspur), + "dBFS": round(dBW(pspur) - full_scale, 1), + } + stats["noise"] = { + "floor": noise_floor, + "power": noise_power, + "rms": np.sqrt(noise_power), + "dBHz": round(dBW(noise_floor, 3) - full_scale, 1), + "NSD": round(dBW(noise_floor, 3) - full_scale - 2 * dBW(fbin, 3), 1), + } + stats["sndr"] = { + "dBc": dBW(psig / noise_power), + "dBFS": round(full_scale - dBW(noise_power), 1), + } + stats["sfdr"] = { + "dBc": dBW(psig / pspur), + "dBFS": round(full_scale - dBW(pspur), 1), + } + stats["enob"] = {"bits": enob(stats["sndr"]["dBFS"])} + + return stats + + +def find_harmonics(spectrum, freq, nfft, bin_sig, psig, harms=5, leak=20, fscale=1e6): + """Get the harmonic contents of the data.""" + harm_stats = {"harm": {}} + harm_index = 2 + for harm in bin_sig * np.arange(2, harms + 1): + harm_stats["harm"][harm_index] = {} + zone = np.floor(harm / (nfft / 2)) + 1 + if zone % 2 == 0: + bin_harm = int(nfft / 2 - (harm - (zone - 1) * nfft / 2)) + else: + bin_harm = int(harm - (zone - 1) * nfft / 2) + + # Make sure we pick the max bin where power is maximized; due to spectral leakage + # if bin_harm == nfft/2, set to bin of 0 + if bin_harm == nfft / 2: + bin_harm = 0 + pwr_max = spectrum[bin_harm] + bin_harm_max = bin_harm + for i in range(bin_harm - leak, bin_harm + leak + 1): + try: + pwr = spectrum[i] + if pwr > pwr_max: + bin_harm_max = i + pwr_max = pwr + except IndexError: + # bin + leakage out of bounds, so stop looking + break + + harm_stats["harm"][harm_index]["bin"] = bin_harm_max + harm_stats["harm"][harm_index]["power"] = pwr_max + harm_stats["harm"][harm_index]["freq"] = round(freq[bin_harm] / fscale, 1) + harm_stats["harm"][harm_index]["dBc"] = dBW(pwr_max / psig) + harm_stats["harm"][harm_index]["dB"] = dBW(pwr_max) + + harm_index = harm_index + 1 + + return harm_stats + + +def get_plot_string(stats, full_scale, fs, nfft, window, xscale=1e6, fscale="MHz"): + """Generate plot string from stats dict.""" + + plt_str = "==== FFT ====\n" + plt_str += f"NFFT = {nfft}\n" + plt_str += f"fbin = {round(fs/nfft / 1e3, 2)} kHz\n" + plt_str += f"window = {window}\n" + plt_str += "\n" + plt_str += "==== Signal ====\n" + plt_str += f"FullScale = {full_scale} dB\n" + plt_str += f"Psig = {stats['sig']['dBFS']} dBFS ({stats['sig']['dB']} dB)\n" + plt_str += f"fsig = {round(stats['sig']['freq']/xscale, 2)} {fscale}\n" + plt_str += f"fsamp = {round(fs/xscale, 2)} {fscale}\n" + plt_str += "\n" + plt_str += "==== SNDR/SFDR ====\n" + plt_str += f"ENOB = {stats['enob']['bits']} bits\n" + plt_str += f"SNDR = {stats['sndr']['dBFS']} dBFS ({stats['sndr']['dBc']} dBc)\n" + plt_str += f"SFDR = {stats['sfdr']['dBFS']} dBFS ({stats['sfdr']['dBc']} dBc)\n" + plt_str += f"Pspur = {stats['spur']['dBFS']} dBFS\n" + plt_str += f"fspur = {round(stats['spur']['freq']/xscale, 2)} {fscale}\n" + plt_str += f"Noise Floor = {stats['noise']['dBHz']} dBFS\n" + plt_str += f"NSD = {stats['noise']['NSD']} dBFS\n" + plt_str += "\n" + plt_str += "==== Harmonics ====\n" + + for hindex, hdata in stats["harm"].items(): + plt_str += f"HD{hindex} = {round(hdata['dB'] - full_scale, 1)} dBFS @ {hdata['freq']} {fscale}\n" + + plt_str += "\n" + + return plt_str diff --git a/adc_eval/eval/spectrum.py b/adc_eval/eval/spectrum.py index 6e2c13e..757bcf7 100644 --- a/adc_eval/eval/spectrum.py +++ b/adc_eval/eval/spectrum.py @@ -2,122 +2,7 @@ import numpy as np import matplotlib.pyplot as plt - - -def db_to_pow(value, places=3): - """Convert dBW to W.""" - if isinstance(value, np.ndarray): - return 10 ** (0.1 * value) - return round(10 ** (0.1 * value), places) - - -def dBW(value, places=1): - """Convert to dBW.""" - if isinstance(value, np.ndarray): - return 10 * np.log10(value) - return round(10 * np.log10(value), places) - - -def enob(sndr, places=1): - """Return ENOB for given SNDR.""" - return round((sndr - 1.76) / 6.02, places) - - -def sndr_sfdr(spectrum, freq, fs, nfft, leak, full_scale=0): - """Get SNDR and SFDR.""" - - # Zero the DC bin - for i in range(0, leak + 1): - spectrum[i] = 0 - bin_sig = np.argmax(spectrum) - psig = sum(spectrum[i] for i in range(bin_sig - leak, bin_sig + leak + 1)) - spectrum_n = spectrum - spectrum_n[bin_sig] = 0 - fbin = fs / nfft - - for i in range(bin_sig - leak, bin_sig + leak + 1): - spectrum_n[i] = 0 - - bin_spur = np.argmax(spectrum_n) - pspur = spectrum[bin_spur] - - noise_power = sum(spectrum_n) - noise_floor = 2 * noise_power / nfft - - stats = {} - - stats["sig"] = { - "freq": freq[bin_sig], - "bin": bin_sig, - "power": psig, - "dB": dBW(psig), - "dBFS": round(dBW(psig) - full_scale, 1), - } - - stats["spur"] = { - "freq": freq[bin_spur], - "bin": bin_spur, - "power": pspur, - "dB": dBW(pspur), - "dBFS": round(dBW(pspur) - full_scale, 1), - } - stats["noise"] = { - "floor": noise_floor, - "power": noise_power, - "rms": np.sqrt(noise_power), - "dBHz": round(dBW(noise_floor, 3) - full_scale, 1), - "NSD": round(dBW(noise_floor, 3) - full_scale - 2 * dBW(fbin, 3), 1), - } - stats["sndr"] = { - "dBc": dBW(psig / noise_power), - "dBFS": round(full_scale - dBW(noise_power), 1), - } - stats["sfdr"] = { - "dBc": dBW(psig / pspur), - "dBFS": round(full_scale - dBW(pspur), 1), - } - stats["enob"] = {"bits": enob(stats["sndr"]["dBFS"])} - - return stats - - -def find_harmonics(spectrum, freq, nfft, bin_sig, psig, harms=5, leak=20, fscale=1e6): - """Get the harmonic contents of the data.""" - harm_stats = {"harm": {}} - harm_index = 2 - for harm in bin_sig * np.arange(2, harms + 1): - harm_stats["harm"][harm_index] = {} - zone = np.floor(harm / (nfft / 2)) + 1 - if zone % 2 == 0: - bin_harm = int(nfft / 2 - (harm - (zone - 1) * nfft / 2)) - else: - bin_harm = int(harm - (zone - 1) * nfft / 2) - - # Make sure we pick the max bin where power is maximized; due to spectral leakage - # if bin_harm == nfft/2, set to bin of 0 - if bin_harm == nfft / 2: - bin_harm = 0 - pwr_max = spectrum[bin_harm] - bin_harm_max = bin_harm - for i in range(bin_harm - leak, bin_harm + leak + 1): - try: - pwr = spectrum[i] - if pwr > pwr_max: - bin_harm_max = i - pwr_max = pwr - except IndexError: - # bin + leakage out of bounds, so stop looking - break - - harm_stats["harm"][harm_index]["bin"] = bin_harm_max - harm_stats["harm"][harm_index]["power"] = pwr_max - harm_stats["harm"][harm_index]["freq"] = round(freq[bin_harm] / fscale, 1) - harm_stats["harm"][harm_index]["dBc"] = dBW(pwr_max / psig) - harm_stats["harm"][harm_index]["dB"] = dBW(pwr_max) - - harm_index = harm_index + 1 - - return harm_stats +from adc_eval.eval import calc def calc_psd(data, fs, nfft=2**12): @@ -151,6 +36,28 @@ def get_spectrum(data, fs=1, nfft=2**12, single_sided=True): return (freq_ds, psd_ds * fs / nfft) +def window_data(data, window="rectangular"): + """Applies a window to the time-domain data.""" + try: + wsize = data.size + except AttributeError: + data = np.array(data) + wsize = data.size + + windows = { + "rectangular": (np.ones(wsize), 1.0), + "hanning": (np.hanning(wsize), 1.633), + } + + if window not in windows: + print(f"WARNING: {window} not implemented. Defaulting to 'rectangular'.") + window = "rectangular" + + wscale = windows[window][1] + + return data * windows[window][0] * wscale + + def plot_spectrum( data, fs=1, @@ -162,67 +69,45 @@ def plot_spectrum( no_plot=False, yaxis="power", single_sided=True, - fscale="MHz", + fscale=("MHz", 1e6), ): """Plot Power Spectrum for input signal.""" - wsize = data.size - windows = { - "rectangular": np.ones(wsize), - "hanning": np.hanning(wsize), - } + (freq, pwr) = get_spectrum(data, fs=fs, nfft=nfft, single_sided=single_sided) - fscalar = { - "Hz": 1, - "kHz": 1e3, - "MHz": 1e6, - "GHz": 1e9, - } - - if window not in windows: - print(f"WARNING: {window} not implemented. Defaulting to 'rectangular'.") - window = "rectangular" - - if fscale not in fscalar: - print(f"WARNING: {fscale} not implemented. Defaulting to 'MHz'.") - fscale = "MHz" - - wscale = { - "rectangular": 1.0, - "hanning": 1.633, - }[window] - - (freq, pwr) = get_spectrum( - data * windows[window] * wscale, fs=fs, nfft=nfft, single_sided=single_sided - ) - full_scale = dBW(dr**2 / 8) + # Calculate the fullscale range of the spectrum in Watts + full_scale = calc.dBW(dr**2 / 8) + # Determine what y-axis scaling to use yaxis_lut = { "power": [0, "dB"], - "fullscale": [dBW(dr**2 / 8), "dBFS"], - "normalize": [max(dBW(pwr)), "dB Normalized"], + "fullscale": [full_scale, "dBFS"], + "normalize": [max(calc.dBW(pwr)), "dB Normalized"], "magnitude": [0, "W"], } lut_key = yaxis.lower() scalar = yaxis_lut[lut_key][0] yunits = yaxis_lut[lut_key][1] - xscale = fscalar[fscale] + xscale = fscale[1] - psd_out = 10 * np.log10(pwr) - scalar + # Convert to dBW and perform scalar based on y-axis scaling input + psd_out = calc.dBW(pwr, places=3) - scalar + + # Use Watts if magnitude y-axis scaling is desired if lut_key in ["magnitude"]: psd_out = pwr + # Get single-sided spectrum for consistent SNDR and harmonic calculation behavior f_ss = freq psd_ss = pwr if not single_sided: # Get single-sided spectrum for SNDR and Harmonic stats - (f_ss, psd_ss) = get_spectrum( - data * windows[window] * wscale, fs=fs, nfft=nfft, single_sided=True - ) + (f_ss, psd_ss) = get_spectrum(data, fs=fs, nfft=nfft, single_sided=True) - sndr_stats = sndr_sfdr(psd_ss, f_ss, fs, nfft, leak=leak, full_scale=full_scale) - - harm_stats = find_harmonics( + sndr_stats = calc.sndr_sfdr( + psd_ss, f_ss, fs, nfft, leak=leak, full_scale=full_scale + ) + harm_stats = calc.find_harmonics( psd_ss, f_ss, nfft, @@ -233,16 +118,21 @@ def plot_spectrum( fscale=xscale, ) + # Merge the two stat dictionaries into one for convenient access stats = {**sndr_stats, **harm_stats} + # Change the x-axis minimum value based on single or dual-sided selection xmin = 0 if single_sided else -fs / 2e6 + # If plotting, prep plot and generate all required axis strings if not no_plot: - plt_str = get_plot_string(stats, full_scale, fs, nfft, window, xscale, fscale) + plt_str = calc.get_plot_string( + stats, full_scale, fs, nfft, window, xscale, fscale[0] + ) fig, ax = plt.subplots(figsize=(15, 8)) ax.plot(freq / xscale, psd_out) ax.set_ylabel(f"Power Spectrum ({yunits})", fontsize=18) - ax.set_xlabel(f"Frequency ({fscale})", fontsize=16) + ax.set_xlabel(f"Frequency ({fscale[0]})", fontsize=16) ax.set_title("Output Power Spectrum", fontsize=16) ax.set_xlim([xmin, fs / (2 * xscale)]) ax.set_ylim([1.1 * min(psd_out), 1]) @@ -307,39 +197,6 @@ def plot_spectrum( return (freq, psd_out, stats) -def get_plot_string(stats, full_scale, fs, nfft, window, xscale=1e6, fscale="MHz"): - """Generate plot string from stats dict.""" - - plt_str = "==== FFT ====\n" - plt_str += f"NFFT = {nfft}\n" - plt_str += f"fbin = {round(fs/nfft / 1e3, 2)} kHz\n" - plt_str += f"window = {window}\n" - plt_str += "\n" - plt_str += "==== Signal ====\n" - plt_str += f"FullScale = {full_scale} dB\n" - plt_str += f"Psig = {stats['sig']['dBFS']} dBFS ({stats['sig']['dB']} dB)\n" - plt_str += f"fsig = {round(stats['sig']['freq']/xscale, 2)} {fscale}\n" - plt_str += f"fsamp = {round(fs/xscale, 2)} {fscale}\n" - plt_str += "\n" - plt_str += "==== SNDR/SFDR ====\n" - plt_str += f"ENOB = {stats['enob']['bits']} bits\n" - plt_str += f"SNDR = {stats['sndr']['dBFS']} dBFS ({stats['sndr']['dBc']} dBc)\n" - plt_str += f"SFDR = {stats['sfdr']['dBFS']} dBFS ({stats['sfdr']['dBc']} dBc)\n" - plt_str += f"Pspur = {stats['spur']['dBFS']} dBFS\n" - plt_str += f"fspur = {round(stats['spur']['freq']/xscale, 2)} {fscale}\n" - plt_str += f"Noise Floor = {stats['noise']['dBHz']} dBFS\n" - plt_str += f"NSD = {stats['noise']['NSD']} dBFS\n" - plt_str += "\n" - plt_str += "==== Harmonics ====\n" - - for hindex, hdata in stats["harm"].items(): - plt_str += f"HD{hindex} = {round(hdata['dB'] - full_scale, 1)} dBFS @ {hdata['freq']} {fscale}\n" - - plt_str += "\n" - - return plt_str - - def analyze( data, nfft, @@ -354,8 +211,24 @@ def analyze( fscale="MHz", ): """Perform spectral analysis on input waveform.""" + fscalar = { + "uHz": 1e-6, + "mHz": 1e-3, + "Hz": 1, + "kHz": 1e3, + "MHz": 1e6, + "GHz": 1e9, + "THz": 1e12, + } + if fscale not in fscalar: + print(f"WARNING: {fscale} not implemented. Defaulting to 'MHz'.") + fscale = "MHz" + + # Window the data + wdata = window_data(data, window=window) + (freq, spectrum, stats) = plot_spectrum( - data, + wdata, fs=fs, nfft=nfft, dr=dr, @@ -365,7 +238,7 @@ def analyze( no_plot=no_plot, yaxis=yaxis, single_sided=single_sided, - fscale=fscale, + fscale=(fscale, fscalar[fscale]), ) return (freq, spectrum, stats) diff --git a/adc_eval/filter.py b/adc_eval/filt.py similarity index 87% rename from adc_eval/filter.py rename to adc_eval/filt.py index 53e8c7b..bc27cee 100644 --- a/adc_eval/filter.py +++ b/adc_eval/filt.py @@ -3,7 +3,9 @@ import numpy as np from scipy.signal import remez, freqz import matplotlib.pyplot as plt +from adc_eval import signals from adc_eval.eval import spectrum +from adc_eval.eval import calc class CICDecimate: @@ -37,13 +39,35 @@ class CICDecimate: def __init__(self, dec=2, order=1, fs=1): """Initialize the CIC filter.""" - self.dec = dec - self.order = order + self._dec = dec + self._order = order self.fs = fs - self.gain = self.dec**self.order + self.gain = dec**order self._xout = None self._xfilt = None + @property + def dec(self): + """Returns the decimation factor.""" + return self._dec + + @dec.setter + def dec(self, value): + """Sets the decimation factor.""" + self._dec = value + self.gain = value**self._order + + @property + def order(self): + """Returns the order of the filter.""" + return self._order + + @order.setter + def order(self, value): + """Sets the filter order.""" + self._order = value + self.gain = self.dec**value + @property def out(self): """Filtered and decimated output data.""" @@ -69,9 +93,11 @@ def filt(self, xarray): self._xfilt = ycomb / self.gain - def decimate(self): + def decimate(self, xarray=None): """decimation routine.""" - self._xout = self._xfilt[:: self.dec] + if xarray is None: + xarray = self._xfilt + self._xout = xarray[:: self.dec] def run(self, xarray): """Runs filtering and decimation on input list.""" @@ -80,8 +106,7 @@ def run(self, xarray): def response(self, fft, no_plot=False): """Plots the frequency response of the pre-decimated filter.""" - xin = np.zeros(fft) - xin[0] = 1 + xin = signals.impulse(fft) self.filt(xin) (freq, psd, stats) = spectrum.analyze( self._xfilt * fft / np.sqrt(2), @@ -146,7 +171,7 @@ def __init__(self, dec=1, fs=1, bit_depth=16, coeffs=None): self.dec = dec self.fs = fs self.bit_depth = bit_depth - self.ntaps = np.size(coeffs) if coeffs is None else 0 + self.ntaps = np.size(coeffs) if coeffs is not None else 0 self.yfilt = None self._out = None @@ -215,7 +240,7 @@ def run(self, xarray): def response(self, fft, no_plot=False): """Plots the frequency response of the pre-decimated filter.""" freq, mag = freqz(self.coeffs, [1], worN=fft, fs=self.fs) - yfft = spectrum.dBW(np.abs(mag)) + yfft = calc.dBW(np.abs(mag)) if not no_plot: fig, ax = plt.subplots(figsize=(15, 8)) ax.plot(freq / 1e6, yfft) diff --git a/adc_eval/signals.py b/adc_eval/signals.py index 002e019..aaf6874 100644 --- a/adc_eval/signals.py +++ b/adc_eval/signals.py @@ -16,3 +16,10 @@ def sin(t, amp=0.5, offset=0.5, freq=1e3, ph0=0): def noise(t, mean=0, std=0.1): """Generate random noise.""" return np.random.normal(mean, std, size=len(t)) + + +def impulse(nsamp, mag=1): + """Generate an impulse input.""" + data = np.zeros(nsamp) + data[0] = mag + return data diff --git a/examples/basic_adc_simulation.py b/examples/basic_adc_simulation.py index 914615d..d5663c4 100644 --- a/examples/basic_adc_simulation.py +++ b/examples/basic_adc_simulation.py @@ -65,6 +65,7 @@ no_plot=False, yaxis="fullscale", single_sided=True, + fscale="MHz" ) ax = plt.gca() ax.set_title("ADC Spectrum") diff --git a/tests/eval/__init__.py b/tests/eval/__init__.py new file mode 100644 index 0000000..c9ae6f7 --- /dev/null +++ b/tests/eval/__init__.py @@ -0,0 +1 @@ +"""Init file for eval tests.""" diff --git a/tests/eval/test_calc.py b/tests/eval/test_calc.py new file mode 100644 index 0000000..a0a627a --- /dev/null +++ b/tests/eval/test_calc.py @@ -0,0 +1,261 @@ +"""Test the eval.calc functions.""" + +import pytest +import numpy as np +from unittest import mock +from adc_eval.eval import calc +from adc_eval.eval import spectrum + + +RTOL = 0.05 +NLEN = 2**16 +AMPLITUDE = 0.5 / np.sqrt(2) +RAND_HARMS = 4 +RAND_NFFT = 3 +RAND_LEAK = 3 +TEST_SNDR = [ + np.random.uniform(low=0.1, high=100, size=np.random.randint(4, 31)) + for _ in range(10) +] +TEST_VALS = [np.random.uniform(low=0.1, high=50) for _ in range(3)] +PLACES = [i for i in range(6)] + + +def gen_spectrum(sig_bin, harmonics, nfft): + """Generate a wave with arbitrary harmonics.""" + t = np.linspace(0, NLEN - 1, NLEN) + vin = np.zeros(len(t)) + fin = sig_bin / nfft + for i in range(1, harmonics + 1): + vin += np.sqrt(2) * AMPLITUDE / i * np.sin(2 * np.pi * i * fin * t) + + return spectrum.get_spectrum(vin, fs=1, nfft=nfft) + + +@pytest.mark.parametrize("data", TEST_VALS) +@pytest.mark.parametrize("places", PLACES) +def test_db_to_pow_places(data, places): + """Test the db_to_pow conversion with multiple places.""" + exp_val = round(10 ** (data / 10), places) + assert exp_val == calc.db_to_pow(data, places=places) + + +@pytest.mark.parametrize("data", TEST_VALS) +@pytest.mark.parametrize("places", PLACES) +def test_db_to_pow_ndarray(data, places): + """Test db_to_pow with ndarray input.""" + data = np.array(data) + exp_val = np.array(round(10 ** (data / 10), places)) + assert exp_val == calc.db_to_pow(data, places=places) + + +@pytest.mark.parametrize("data", TEST_VALS) +@pytest.mark.parametrize("places", PLACES) +def test_dbW(data, places): + """Test the dbW conversion with normal inputs.""" + exp_val = round(10 * np.log10(data), places) + assert exp_val == calc.dBW(data, places=places) + + +@pytest.mark.parametrize("data", TEST_VALS) +@pytest.mark.parametrize("places", PLACES) +def test_dbW_ndarray(data, places): + """Test dbW with ndarray input.""" + data = np.array(data) + exp_val = np.array(round(10 * np.log10(data), places)) + assert exp_val == calc.dBW(data, places=places) + + +@pytest.mark.parametrize("data", TEST_VALS) +@pytest.mark.parametrize("places", PLACES) +def test_enob(data, places): + """Test enob with muliple places.""" + exp_val = round(1 / 6.02 * (data - 1.76), places) + assert exp_val == calc.enob(data, places=places) + + +@pytest.mark.parametrize("harms", np.random.randint(1, 21, RAND_HARMS)) +@pytest.mark.parametrize("nfft", 2 ** (np.random.randint(10, 16, RAND_NFFT))) +def test_find_harmonics(harms, nfft): + """Test the find harmonics method.""" + nbin = np.random.randint(1, int(nfft / (2 * harms)) - 1) + (freq, pwr) = gen_spectrum(nbin, harms, nfft) + + stats = calc.find_harmonics( + pwr, freq, nfft, nbin, AMPLITUDE, harms=harms, leak=0, fscale=1e-6 + ) + + for n in range(2, harms + 1): + msg_txt = f"nfft={nfft}, nbin={nbin}, harm={harms}, index={n}" + exp_bin = n * nbin + exp_power = (AMPLITUDE / n) ** 2 + exp_freq = freq[exp_bin] * 1e6 + assert stats["harm"][n]["bin"] == exp_bin, msg_txt + assert np.allclose(stats["harm"][n]["freq"], exp_freq, rtol=RTOL), msg_txt + assert np.allclose(stats["harm"][n]["power"], exp_power, rtol=RTOL), msg_txt + + +@pytest.mark.parametrize("harms", np.random.randint(1, 21, RAND_HARMS)) +@pytest.mark.parametrize("nfft", 2 ** (np.random.randint(12, 16, RAND_NFFT))) +@pytest.mark.parametrize("leak", np.random.randint(1, 10, RAND_LEAK)) +def test_find_harmonics_with_leakage(harms, nfft, leak): + """Test the find harmonics method with spectral leakage.""" + nbin = np.random.randint(2 * leak, int(nfft / (2 * harms)) - 1) + nbin = nbin + round( + np.random.uniform(-0.5, 0.5), 2 + ) # Ensures we're not coherently sampled + (freq, pwr) = gen_spectrum(nbin, harms, nfft) + + stats = calc.find_harmonics( + pwr, freq, nfft, nbin, AMPLITUDE, harms=harms, leak=leak + ) + + for n in range(2, harms + 1): + msg_txt = f"nfft={nfft}, nbin={nbin}, harm={harms}, leak={leak}, index={n}" + bin_low = n * nbin - leak + bin_high = n * nbin + leak + assert bin_low <= stats["harm"][n]["bin"] <= bin_high, msg_txt + + +@pytest.mark.parametrize("harms", np.random.randint(2, 21, RAND_HARMS)) +@pytest.mark.parametrize("nfft", 2 ** (np.random.randint(12, 16, RAND_NFFT))) +@pytest.mark.parametrize("leak", np.random.randint(1, 10, RAND_LEAK)) +def test_find_harmonics_with_leakage_outside_bounds(harms, nfft, leak): + """Test find harmonics with leakage bins exceeding array bounds.""" + nbin = nfft / 4 - 0.5 + (freq, pwr) = gen_spectrum(nbin, harms, nfft) + stats = calc.find_harmonics( + pwr, freq, nfft, nbin, AMPLITUDE, harms=harms, leak=leak + ) + # Only check second harmonic which is guaranteed to be at edge of FFT + msg_txt = f"nfft={nfft}, nbin={nbin}, harm={harms}, leak={leak}" + assert nfft / 2 - leak <= stats["harm"][2]["bin"] <= nfft / 2 - 1 + + +@pytest.mark.parametrize("harms", np.random.randint(6, 21, RAND_HARMS)) +@pytest.mark.parametrize("nfft", 2 ** (np.random.randint(8, 16, RAND_NFFT))) +def test_find_harmonics_on_fft_bound(harms, nfft): + """Test find harmonics with harmonics landing at nfft/2.""" + nbin = nfft / 8 + (freq, pwr) = gen_spectrum(nbin, harms, nfft) + + stats = calc.find_harmonics(pwr, freq, nfft, nbin, AMPLITUDE, harms=harms, leak=0) + + exp_bin = { + 2: 2 * nbin, + 3: 3 * nbin, + 4: 0, + 5: nfft - 5 * nbin, + } + + for n, exp_val in exp_bin.items(): + msg_txt = f"nfft={nfft}, nbin={nbin}, harm={harms}, index={n}" + assert stats["harm"][n]["bin"] == exp_val, msg_txt + + +@pytest.mark.parametrize("data", TEST_SNDR) +def test_sndr_sfdr_outputs(data): + """Test the sndr_sfdr method outputs.""" + freq = np.linspace(0, 1000, np.size(data)) + full_scale = -3 + nfft = 2**8 + fs = 1 + + psd_test = data.copy() + psd_exp = data.copy() + + result = calc.sndr_sfdr(psd_test, freq, fs, nfft, 0, full_scale=full_scale) + + data[0] = 0 + psd_exp[0] = 0 + data_string = f"F = {freq}\nD = {data}" + + indices = np.argsort(psd_exp) + sbin = indices[-1] + spurbin = indices[-2] + sfreq = freq[sbin] + spwr = psd_exp[sbin] + + psd_exp[sbin] = 0 + spurfreq = freq[spurbin] + spurpwr = psd_exp[spurbin] + + noise_pwr = np.sum(psd_exp[1:]) + + exp_return = { + "sig": { + "freq": sfreq, + "bin": sbin, + "power": spwr, + "dB": round(10 * np.log10(spwr), 1), + "dBFS": round(10 * np.log10(spwr) - full_scale, 1), + }, + "spur": { + "freq": spurfreq, + "bin": spurbin, + "power": spurpwr, + "dB": 10 * np.log10(spurpwr), + "dBFS": round(10 * np.log10(spurpwr) - full_scale, 1), + }, + "noise": { + "floor": 2 * noise_pwr / nfft, + "power": noise_pwr, + "rms": np.sqrt(noise_pwr), + "dBHz": round(10 * np.log10(2 * noise_pwr / nfft) - full_scale, 1), + "NSD": round( + 10 * np.log10(2 * noise_pwr / nfft) + - full_scale + - 2 * 10 * np.log10(fs / nfft), + 1, + ), + }, + "sndr": { + "dBc": round(10 * np.log10(spwr / noise_pwr), 1), + "dBFS": round(full_scale - 10 * np.log10(noise_pwr), 1), + }, + "sfdr": { + "dBc": round(10 * np.log10(spwr / spurpwr), 1), + "dBFS": round(full_scale - 10 * np.log10(spurpwr), 1), + }, + "enob": { + "bits": round((full_scale - 10 * np.log10(noise_pwr) - 1.76) / 6.02, 1), + }, + } + + for key, val in exp_return.items(): + for measure, measure_val in val.items(): + msg = f"{data_string}\n{key} -> {measure} | Expected {measure_val} | Got {result[key][measure]}" + assert np.allclose(measure_val, result[key][measure], rtol=RTOL), msg + + +@pytest.mark.parametrize("harms", np.random.randint(2, 21, RAND_HARMS)) +@pytest.mark.parametrize("nfft", 2 ** (np.random.randint(8, 16, RAND_NFFT))) +def test_plot_string(harms, nfft): + """Test proper return of plotting string.""" + nbin = np.random.randint(2, int(nfft / (2 * harms)) - 1) + (freq, pwr) = gen_spectrum(nbin, harms, nfft) + + stats = calc.sndr_sfdr(pwr, freq, 1, nfft, leak=0, full_scale=0) + hstats = calc.find_harmonics( + pwr, freq, nfft, nbin, AMPLITUDE, harms=harms, leak=0, fscale=1 + ) + + all_stats = {**stats, **hstats} + + plt_str = calc.get_plot_string( + all_stats, 0, 1, nfft, window="rectangular", xscale=1, fscale="Hz" + ) + + # Check for important information, not everything + msg_txt = f"{all_stats}\n{plt_str}" + + assert f"NFFT = {nfft}" in plt_str, msg_txt + assert f"ENOB = {all_stats['enob']['bits']} bits" in plt_str, msg_txt + assert f"SNDR = {all_stats['sndr']['dBFS']} dBFS" in plt_str, msg_txt + assert f"SFDR = {all_stats['sfdr']['dBFS']} dBFS" in plt_str, msg_txt + assert f"Noise Floor = {all_stats['noise']['dBHz']} dBFS" in plt_str, msg_txt + + for n in range(2, harms + 1): + harm_power = round(all_stats["harm"][n]["dB"], 1) + harm_freq = all_stats["harm"][n]["freq"] + assert f"HD{n} = {harm_power} dBFS @ {harm_freq} Hz" in plt_str, msg_txt diff --git a/tests/eval/test_spectrum.py b/tests/eval/test_spectrum.py new file mode 100644 index 0000000..4f11331 --- /dev/null +++ b/tests/eval/test_spectrum.py @@ -0,0 +1,334 @@ +"""Test the spectrum module.""" + +import pytest +import numpy as np +from unittest import mock +from adc_eval.eval import spectrum + + +RTOL = 0.05 +NLEN = 2**18 +NFFT = 2**10 +DATA_SINE = [ + { + "f1": np.random.randint(1, NFFT / 4 - 1), + "f2": np.random.randint(NFFT / 4, NFFT / 2 - 1), + "a1": np.random.uniform(low=0.5, high=0.8), + "a2": np.random.uniform(low=0.1, high=0.4), + } + for _ in range(10) +] + + +@mock.patch("adc_eval.eval.spectrum.calc_psd") +def test_get_spectrum(mock_calc_psd): + """Test that the get_spectrum method returns power spectrum.""" + fs = 4 + nfft = 3 + data = np.array([1]) + exp_spectrum = np.array([fs / nfft]) + + mock_calc_psd.return_value = (None, data, None, 2 * data) + + assert (None, exp_spectrum) == spectrum.get_spectrum( + None, fs=fs, nfft=nfft, single_sided=True + ) + + +@mock.patch("adc_eval.eval.spectrum.calc_psd") +def test_get_spectrum_dual(mock_calc_psd): + """Test that the get_spectrum method returns dual-sided power spectrum.""" + fs = 4 + nfft = 3 + data = np.array([1]) + exp_spectrum = np.array([fs / nfft]) + + mock_calc_psd.return_value = (None, data, None, 2 * data) + + assert (None, 2 * exp_spectrum) == spectrum.get_spectrum( + None, fs=fs, nfft=nfft, single_sided=False + ) + + +@pytest.mark.parametrize("data", [np.random.randn(NLEN) for _ in range(10)]) +def test_calc_psd_randomized_dual(data): + """Test calc_psd with random data.""" + (_, _, freq, psd) = spectrum.calc_psd(data, 1, nfft=NFFT) + mean_val = np.mean(psd) + assert np.isclose(mean_val, 1, rtol=RTOL) + + +@pytest.mark.parametrize("data", [np.random.randn(NLEN) for _ in range(10)]) +def test_calc_psd_randomized_single(data): + """Test calc_psd with random data and single-sided.""" + (freq, psd, _, _) = spectrum.calc_psd(data, 1, nfft=NFFT) + mean_val = np.mean(psd) + assert np.isclose(mean_val, 2, rtol=RTOL) + + +def test_calc_psd_zeros_dual(): + """Test calc_psd with zeros.""" + data = np.zeros(NLEN) + (_, _, freq, psd) = spectrum.calc_psd(data, 1, nfft=NFFT) + mean_val = np.mean(psd) + assert np.isclose(mean_val, 0, rtol=RTOL) + + +def test_calc_psd_zeros_single(): + """Test calc_psd with zeros and single-sided..""" + data = np.zeros(NLEN) + (freq, psd, _, _) = spectrum.calc_psd(data, 1, nfft=NFFT) + mean_val = np.mean(psd) + assert np.isclose(mean_val, 0, rtol=RTOL) + + +def test_calc_psd_ones_dual(): + """Test calc_psd with ones.""" + data = np.ones(NLEN) + (_, _, freq, psd) = spectrum.calc_psd(data, 1, nfft=NFFT) + mean_val = np.mean(psd) + assert np.isclose(mean_val, 1, rtol=RTOL) + + +def test_calc_psd_ones_single(): + """Test calc_psd with ones and single-sided.""" + data = np.ones(NLEN) + (freq, psd, _, _) = spectrum.calc_psd(data, 1, nfft=NFFT) + mean_val = np.mean(psd) + assert np.isclose(mean_val, 2, rtol=RTOL) + + +@pytest.mark.parametrize("data", DATA_SINE) +def test_calc_psd_two_sine_dual(data): + """Test calc_psd with two sine waves.""" + fs = 1 + fbin = fs / NFFT + f1 = data["f1"] * fbin + f2 = data["f2"] * fbin + a1 = data["a1"] + a2 = data["a2"] + + t = 1 / fs * np.linspace(0, NLEN - 1, NLEN) + pin = a1 * np.sin(2 * np.pi * f1 * t) + a2 * np.sin(2 * np.pi * f2 * t) + + (_, _, freq, psd) = spectrum.calc_psd(pin, fs, nfft=NFFT) + + exp_peaks = [ + round(a1**2 / 4 * NFFT, 3), + round(a2**2 / 4 * NFFT, 3), + ] + + exp_f1 = [round(-f1, 2), round(f1, 2)] + exp_f2 = [round(-f2, 2), round(f2, 2)] + + peak1 = max(psd) + ipeaks = np.where(psd >= peak1 * (1 - RTOL))[0] + fpeaks = [round(freq[ipeaks[0]], 2), round(freq[ipeaks[1]], 2)] + + assertmsg = f"f1={f1} | f2={f2} | a1={a1} | a2={a2}" + + assert np.allclose(peak1, exp_peaks[0], rtol=RTOL), assertmsg + assert np.allclose(fpeaks, exp_f1, rtol=RTOL), assertmsg + + psd[ipeaks[0]] = 0 + psd[ipeaks[1]] = 0 + + peak2 = max(psd) + ipeaks = np.where(psd >= peak2 * (1 - RTOL))[0] + fpeaks = [round(freq[ipeaks[0]], 2), round(freq[ipeaks[1]], 2)] + + assert np.allclose(peak2, exp_peaks[1], rtol=RTOL), assertmsg + assert np.allclose(fpeaks, exp_f2), assertmsg + + +@pytest.mark.parametrize("data", DATA_SINE) +def test_calc_psd_two_sine_single(data): + """Test calc_psd with two sine waves, single-eided.""" + fs = 1 + fbin = fs / NFFT + f1 = data["f1"] * fbin + f2 = data["f2"] * fbin + a1 = data["a1"] + a2 = data["a2"] + + t = 1 / fs * np.linspace(0, NLEN - 1, NLEN) + pin = a1 * np.sin(2 * np.pi * f1 * t) + a2 * np.sin(2 * np.pi * f2 * t) + + (freq, psd, _, _) = spectrum.calc_psd(pin, fs, nfft=NFFT) + + exp_peaks = [ + round(a1**2 / 2 * NFFT, 3), + round(a2**2 / 2 * NFFT, 3), + ] + exp_f1 = round(f1, 2) + exp_f2 = round(f2, 2) + + peak1 = max(psd) + ipeak = np.where(psd == peak1)[0][0] + fpeak = round(freq[ipeak], 2) + + assertmsg = f"f1={f1} | f2={f2} | a1={a1} | a2={a2}" + + assert np.allclose(peak1, exp_peaks[0], rtol=RTOL), assertmsg + assert np.allclose(fpeak, exp_f1), assertmsg + + psd[ipeak] = 0 + + peak2 = max(psd) + ipeak = np.where(psd == peak2)[0][0] + fpeak = round(freq[ipeak], 2) + + assert np.allclose(peak2, exp_peaks[1], rtol=RTOL), assertmsg + assert np.allclose(fpeak, exp_f2), assertmsg + + +def test_window_data_as_list(): + """Tests the window_data function when given a list instead of numpy array.""" + data = np.random.rand(NLEN).tolist() + wdata = spectrum.window_data(data, window="rectangular") + + assert type(data) == type(list()) + assert type(wdata) == type(np.ndarray([])) + + +def test_window_data_bad_window_type(capfd): + """Tests the window_data function with an incorrect window selection.""" + data = np.random.rand(NLEN) + wdata = spectrum.window_data(data, window="foobar") + captured = capfd.readouterr() + + assert data.size == wdata.size + assert data.all() == wdata.all() + assert "WARNING" in captured.out + + +@mock.patch("adc_eval.eval.spectrum.plot_spectrum") +def test_analyze_bad_input_scalar(mock_plot_spectrum, capfd): + """Tests bad input scalar keys.""" + mock_plot_spectrum.return_value = (None, None, None) + mock_plot_spectrum.side_effect = lambda *args, **kwargs: (kwargs, None, None) + (kwargs, _, _) = spectrum.analyze([0], 1, fscale="foobar") + captured = capfd.readouterr() + + assert "WARNING" in captured.out + assert kwargs.get("fscale") == ("MHz", 1e6) + + +@mock.patch("adc_eval.eval.spectrum.plot_spectrum") +def test_analyze_valid_input_scalar(mock_plot_spectrum): + """Tests the valid input scalar keys.""" + mock_plot_spectrum.return_value = (None, None, None) + mock_plot_spectrum.side_effect = lambda *args, **kwargs: (kwargs, None, None) + + test_vals = { + "Hz": 1, + "kHz": 1e3, + "MHz": 1e6, + "GHz": 1e9, + } + for key, val in test_vals.items(): + (kwargs, _, _) = spectrum.analyze([0], 1, fscale=key) + assert kwargs.get("fscale") == (key, val) + + +@mock.patch("adc_eval.eval.calc.sndr_sfdr") +@mock.patch("adc_eval.eval.calc.find_harmonics") +def test_analyze_no_plot(mock_sndr_sfdr, mock_find_harmonics): + """Tests the psd output of the analyze function with no plotting.""" + data = np.random.rand(NLEN) + data_sndr = { + "sig": {"bin": 1, "power": 2}, + } + data_harms = {"harmonics": 3} + exp_stats = {**data_sndr, **data_harms} + + mock_sndr_sfdr.return_value = data_sndr + mock_find_harmonics = data_harms + + (freq, psd, stats) = spectrum.analyze( + data, + fs=1, + nfft=NFFT, + dr=1, + harmonics=0, + leak=0, + window="rectangular", + no_plot=True, + yaxis="power", + single_sided=True, + fscale="Hz", + ) + + assert freq.all() == np.linspace(0, 1, int(NFFT / 2)).all() + assert psd.size == int(NFFT / 2) + + for key, value in stats.items(): + assert value == exp_stats[key] + + +@mock.patch("adc_eval.eval.calc.sndr_sfdr") +@mock.patch("adc_eval.eval.calc.find_harmonics") +def test_analyze_no_plot_dual(mock_sndr_sfdr, mock_find_harmonics): + """Tests the psd output of the analyze function with no plotting.""" + data = np.random.rand(NLEN) + data_sndr = { + "sig": {"bin": 1, "power": 2}, + } + data_harms = {"harmonics": 3} + exp_stats = {**data_sndr, **data_harms} + + mock_sndr_sfdr.return_value = data_sndr + mock_find_harmonics = data_harms + + (freq, psd, stats) = spectrum.analyze( + data, + fs=1, + nfft=NFFT, + dr=1, + harmonics=0, + leak=0, + window="rectangular", + no_plot=True, + yaxis="power", + single_sided=False, + fscale="Hz", + ) + + assert freq.all() == np.linspace(-0.5, 0.5, NFFT - 1).all() + assert psd.size == NFFT + for key, value in stats.items(): + assert value == exp_stats[key] + + +@mock.patch("adc_eval.eval.calc.sndr_sfdr") +@mock.patch("adc_eval.eval.calc.find_harmonics") +def test_analyze_no_plot_magnitude(mock_sndr_sfdr, mock_find_harmonics): + """Tests the psd output of the analyze function with no plotting.""" + data = np.random.rand(NLEN) + data_sndr = { + "sig": {"bin": 1, "power": 2}, + } + data_harms = {"harmonics": 3} + exp_stats = {**data_sndr, **data_harms} + + mock_sndr_sfdr.return_value = data_sndr + mock_find_harmonics = data_harms + + (freq, psd, stats) = spectrum.analyze( + data, + fs=1, + nfft=NFFT, + dr=1, + harmonics=0, + leak=0, + window="rectangular", + no_plot=True, + yaxis="magnitude", + single_sided=True, + fscale="Hz", + ) + + assert freq.all() == np.linspace(0, 1, int(NFFT / 2)).all() + assert psd.size == int(NFFT / 2) + for key, value in stats.items(): + assert value == exp_stats[key] diff --git a/tests/test_calc_psd.py b/tests/test_calc_psd.py deleted file mode 100644 index 1c5aa4f..0000000 --- a/tests/test_calc_psd.py +++ /dev/null @@ -1,139 +0,0 @@ -"""Test the calc_psd method.""" - -import unittest -import numpy as np -from unittest import mock -from adc_eval.eval import spectrum - - -class TestCalcPSD(unittest.TestCase): - """Test the calc_psd method.""" - - def setUp(self): - """Initialize tests.""" - self.nfft = 2**8 - self.nlen = 2**18 - accuracy = 0.01 - self.bounds = [1 - accuracy, 1 + accuracy] - np.random.seed(1) - - def test_calc_psd_randomized_dual(self): - """Test calc_psd with random data.""" - for i in range(0, 10): - data = np.random.randn(self.nlen) - (_, _, freq, psd) = spectrum.calc_psd(data, 1, nfft=self.nfft) - mean_val = np.mean(psd) - self.assertTrue(self.bounds[0] <= mean_val <= self.bounds[1], msg=mean_val) - - def test_calc_psd_randomized_single(self): - """Test calc_psd with random data and single-sided.""" - for i in range(0, 10): - data = np.random.randn(self.nlen) - (freq, psd, _, _) = spectrum.calc_psd(data, 1, nfft=self.nfft) - mean_val = np.mean(psd) - self.assertTrue( - 2 * self.bounds[0] <= mean_val <= 2 * self.bounds[1], msg=mean_val - ) - - def test_calc_psd_zeros_dual(self): - """Test calc_psd with zeros.""" - data = np.zeros(self.nlen) - (_, _, freq, psd) = spectrum.calc_psd(data, 1, nfft=self.nfft) - mean_val = np.mean(psd) - self.assertTrue( - self.bounds[0] - 1 <= mean_val <= self.bounds[1] - 1, msg=mean_val - ) - - def test_calc_psd_zeros_single(self): - """Test calc_psd with zeros and single-sided..""" - data = np.zeros(self.nlen) - (freq, psd, _, _) = spectrum.calc_psd(data, 1, nfft=self.nfft) - mean_val = np.mean(psd) - self.assertTrue( - self.bounds[0] - 1 <= mean_val <= self.bounds[1] - 1, msg=mean_val - ) - - def test_calc_psd_ones_dual(self): - """Test calc_psd with ones.""" - data = np.ones(self.nlen) - (_, _, freq, psd) = spectrum.calc_psd(data, 1, nfft=self.nfft) - mean_val = np.mean(psd) - self.assertTrue(self.bounds[0] <= mean_val <= self.bounds[1], msg=mean_val) - - def test_calc_psd_ones_single(self): - """Test calc_psd with ones and single-sided.""" - data = np.ones(self.nlen) - (freq, psd, _, _) = spectrum.calc_psd(data, 1, nfft=self.nfft) - mean_val = np.mean(psd) - self.assertTrue( - 2 * self.bounds[0] <= mean_val <= 2 * self.bounds[1], msg=mean_val - ) - - def test_calc_psd_two_sine_dual(self): - """Test calc_psd with two sine waves.""" - fs = 1 - fbin = fs / self.nfft - f1 = 29 * fbin - f2 = 97 * fbin - a1 = 0.37 - a2 = 0.11 - t = 1 / fs * np.linspace(0, self.nlen - 1, self.nlen) - data = a1 * np.sin(2 * np.pi * f1 * t) + a2 * np.sin(2 * np.pi * f2 * t) - (_, _, freq, psd) = spectrum.calc_psd(data, fs, nfft=self.nfft) - exp_peaks = [ - round(a1**2 / 4 * self.nfft, 3), - round(a2**2 / 4 * self.nfft, 3), - ] - exp_f1 = [round(-f1, 2), round(f1, 2)] - exp_f2 = [round(-f2, 2), round(f2, 2)] - - peak1 = max(psd) - ipeaks = np.where(psd >= peak1 * self.bounds[0])[0] - fpeaks = [round(freq[ipeaks[0]], 2), round(freq[ipeaks[1]], 2)] - - self.assertEqual(round(peak1, 3), exp_peaks[0]) - self.assertListEqual(fpeaks, exp_f1) - - psd[ipeaks[0]] = 0 - psd[ipeaks[1]] = 0 - - peak2 = max(psd) - ipeaks = np.where(psd >= peak2 * self.bounds[0])[0] - fpeaks = [round(freq[ipeaks[0]], 2), round(freq[ipeaks[1]], 2)] - - self.assertEqual(round(peak2, 3), exp_peaks[1]) - self.assertListEqual(fpeaks, exp_f2) - - def test_calc_psd_two_sine_single(self): - """Test calc_psd with two sine waves, single-eided.""" - fs = 1 - fbin = fs / self.nfft - f1 = 29 * fbin - f2 = 97 * fbin - a1 = 0.37 - a2 = 0.11 - t = 1 / fs * np.linspace(0, self.nlen - 1, self.nlen) - data = a1 * np.sin(2 * np.pi * f1 * t) + a2 * np.sin(2 * np.pi * f2 * t) - (freq, psd, _, _) = spectrum.calc_psd(data, fs, nfft=self.nfft) - exp_peaks = [ - round(a1**2 / 2 * self.nfft, 3), - round(a2**2 / 2 * self.nfft, 3), - ] - exp_f1 = round(f1, 2) - exp_f2 = round(f2, 2) - - peak1 = max(psd) - ipeak = np.where(psd == peak1)[0][0] - fpeak = round(freq[ipeak], 2) - - self.assertEqual(round(peak1, 3), exp_peaks[0]) - self.assertEqual(fpeak, exp_f1) - - psd[ipeak] = 0 - - peak2 = max(psd) - ipeak = np.where(psd == peak2)[0][0] - fpeak = round(freq[ipeak], 2) - - self.assertEqual(round(peak2, 3), exp_peaks[1]) - self.assertEqual(fpeak, exp_f2) diff --git a/tests/test_filt.py b/tests/test_filt.py new file mode 100644 index 0000000..cb4e83c --- /dev/null +++ b/tests/test_filt.py @@ -0,0 +1,177 @@ +"""Test the filter module.""" + +import pytest +import numpy as np +from unittest import mock +from adc_eval import filt +from adc_eval import signals + + +@pytest.mark.parametrize("dec", np.random.randint(1, 20, 4)) +def test_cic_decimate_set_dec_updates_gain(dec): + """Tests that changing decimation factor updates gain.""" + cicfilt = filt.CICDecimate(dec=1, order=2) + assert cicfilt.gain == 1 + + cicfilt.dec = dec + assert cicfilt.gain == (dec**2) + + +@pytest.mark.parametrize("order", np.random.randint(1, 20, 4)) +def test_cic_decimate_set_order_updates_gain(order): + """Tests that changing filter order updates gain.""" + cicfilt = filt.CICDecimate(dec=2, order=1) + assert cicfilt.gain == 2 + + cicfilt.order = order + assert cicfilt.gain == (2**order) + + +def test_cic_decimate_returns_ndarray(): + """Tests the CICDecimate output data conversion.""" + cicfilt = filt.CICDecimate() + data = np.random.randn(100).tolist() + cicfilt._xout = data + + assert type(cicfilt.out) == type(np.array(list())) + assert cicfilt.out.all() == np.array(data).all() + + +@pytest.mark.parametrize("dec", np.random.randint(1, 20, 4)) +def test_cic_decimate_function(dec): + """Tests the CICDecimate decimate function.""" + cicfilt = filt.CICDecimate(dec=dec) + data = np.random.randn(100) + cicfilt.decimate(data) + + exp_result = data[::dec] + + assert cicfilt.out.size == exp_result.size + assert cicfilt.out.all() == exp_result.all() + + +def test_cic_decimate_function_none_input(): + """Tests the CICDecimate decimate function with no input arg.""" + cicfilt = filt.CICDecimate(dec=1) + data = np.random.randn(100) + cicfilt._xfilt = data + cicfilt.decimate() + + exp_result = data + + assert cicfilt.out.size == exp_result.size + assert cicfilt.out.all() == exp_result.all() + + +@pytest.mark.parametrize("nlen", np.random.randint(8, 2**10, 4)) +def test_cic_decimate_all_ones(nlen): + """Test the CICDecimate filtering with all ones.""" + cicfilt = filt.CICDecimate(dec=1, order=1) + data = np.ones(nlen) + cicfilt.run(data) + + exp_data = data.copy() + exp_data[0] = 0 + + assert cicfilt.out.all() == exp_data.all() + + +@pytest.mark.parametrize("nlen", np.random.randint(8, 2**10, 4)) +def test_cic_decimate_all_zeros(nlen): + """Test the CICDecimate filtering with all zeros.""" + cicfilt = filt.CICDecimate(dec=1, order=1) + data = np.zeros(nlen) + cicfilt.run(data) + + exp_data = data.copy() + + assert cicfilt.out.all() == exp_data.all() + + +@pytest.mark.parametrize("nlen", np.random.randint(8, 2**10, 4)) +def test_cic_decimate_impulse(nlen): + """Test the CICDecimate filtering with impulse.""" + cicfilt = filt.CICDecimate(dec=1, order=1) + data = signals.impulse(nlen) + cicfilt.run(data) + + exp_data = np.concatenate([[0], data[0:-1]]) + + assert cicfilt.out.all() == exp_data.all() + + +def test_fir_lowpass_returns_ndarray(): + """Tests the FIRLowPass output data conversion.""" + fir = filt.FIRLowPass() + data = np.random.randn(100).tolist() + fir._out = data + + assert type(fir.out) == type(np.array(list())) + assert fir.out.all() == np.array(data).all() + + +@pytest.mark.parametrize("dec", np.random.randint(1, 20, 4)) +def test_fir_decimate_function(dec): + """Tests the FIRLowPass decimate function.""" + fir = filt.FIRLowPass(dec=dec) + data = np.random.randn(100) + fir.decimate(data) + + exp_result = data[::dec] + + assert fir.out.size == exp_result.size + assert fir.out.all() == exp_result.all() + + +def test_fir_decimate_function_none_input(): + """Tests the FIRLowPass decimate function with no input arg.""" + fir = filt.FIRLowPass(dec=1) + data = np.random.randn(100) + fir.yfilt = data + fir.decimate() + + exp_result = data + + assert fir.out.size == exp_result.size + assert fir.out.all() == exp_result.all() + + +@mock.patch("adc_eval.filt.remez") +def test_fir_lowpass_tap_generation(mock_remez, capfd): + """Tests the FIRLowPass decimate function.""" + fir = filt.FIRLowPass() + fir.ntaps = 3 + fir.bit_depth = 12 + mock_remez.return_value = np.ones(3) + + (taps, coeffs) = fir.generate_taps(0.1) + + captured = capfd.readouterr() + exp_coeffs = [2**12, 2**12, 2**12] + + assert "WARNING" in captured.out + assert taps == 3 + assert coeffs == exp_coeffs + + +@pytest.mark.parametrize("ntaps", np.random.randint(3, 511, 5)) +def test_fir_lowpass_run(ntaps): + """Tests the FIRLowPass run function.""" + fir = filt.FIRLowPass() + fir.ntaps = ntaps + fir.bit_depth = 10 + fir.coeffs = 2**10 * np.ones(ntaps) + data = signals.impulse(2**12) + fir.run(data) + + exp_sum = np.ceil((ntaps + 1) / 2) + out_sum = sum(fir.out) + + tap_val = int(exp_sum) + + assert fir.out.size == data.size + assert max(fir.out) == 1 + assert min(fir.out) == 0 + assert fir.out[0:tap_val].all() == 1 + assert fir.out[tap_val + 1 :].all() == 0 + assert out_sum == exp_sum diff --git a/tests/test_signals.py b/tests/test_signals.py new file mode 100644 index 0000000..5e94d22 --- /dev/null +++ b/tests/test_signals.py @@ -0,0 +1,81 @@ +"""Test the signals module.""" + +import pytest +import numpy as np +from scipy import stats +from adc_eval import signals + + +RTOL = 0.01 + + +@pytest.mark.parametrize("nlen", np.random.randint(4, 2**16, 3)) +@pytest.mark.parametrize("fs", np.random.uniform(1, 1e9, 3)) +def test_time(nlen, fs): + """Test time with random data.""" + value = signals.time(nlen, fs=fs) + assert value.size == nlen + assert value[0] == 0 + assert np.isclose(value[nlen - 1], (nlen - 1) / fs, rtol=RTOL) + + +@pytest.mark.parametrize("nlen", np.random.randint(2**10, 2**16, 3)) +@pytest.mark.parametrize("offset", np.random.uniform(-10, 10, 3)) +@pytest.mark.parametrize("amp", np.random.uniform(0, 10, 3)) +def test_sin(nlen, offset, amp): + """Test sine generation with random data.""" + fs = np.random.uniform(1, 1e9) + fin = np.random.uniform(fs / 10, fs / 3) + + t = signals.time(nlen, fs=fs) + value = signals.sin(t, amp=amp, offset=offset, freq=fin) + + exp_peaks = [offset - amp, amp + offset] + + assert value.size == nlen + assert np.isclose(max(value), exp_peaks[1], rtol=RTOL) + assert np.isclose(min(value), exp_peaks[0], rtol=RTOL) + assert value[0] == offset + + +@pytest.mark.parametrize("nlen", np.random.randint(1, 2**16, 5)) +def test_noise_length(nlen): + """Test noise generation with random data.""" + t = signals.time(nlen, fs=1) + value = signals.noise(t, mean=0, std=1) + + # Just check correct size + assert value.size == nlen + + +@pytest.mark.parametrize("std", np.random.uniform(0, 1, 4)) +def test_noise_length(std): + """Test noise is gaussian with random data.""" + nlen = 2**12 + t = signals.time(nlen, fs=1) + noise = signals.noise(t, mean=0, std=std) + autocorr = np.correlate(noise, noise, mode="full") + autocorr /= max(autocorr) + asize = autocorr.size + + midlag = autocorr.size // 2 + acorr_nopeak = np.concatenate([autocorr[0 : midlag - 1], autocorr[midlag + 1 :]]) + + shapiro = stats.shapiro(acorr_nopeak) + + # Check that middle lag is 1 + assert autocorr[midlag] == 1 + + # Now check that noise is gaussian + assert shapiro.pvalue < 0.01 + + +@pytest.mark.parametrize("nlen", np.random.randint(2, 2**12, 3)) +@pytest.mark.parametrize("mag", np.random.uniform(0.1, 100, 3)) +def test_impulse(nlen, mag): + """Test impulse generation with random length and amplitude.""" + data = signals.impulse(nlen, mag) + + assert data.size == nlen + assert data[0] == mag + assert data[1:].all() == 0 diff --git a/tests/test_spectrum.py b/tests/test_spectrum.py deleted file mode 100644 index 3a46f34..0000000 --- a/tests/test_spectrum.py +++ /dev/null @@ -1,108 +0,0 @@ -"""Test the spectrum module.""" - -import unittest -import numpy as np -from unittest import mock -from adc_eval.eval import spectrum - - -class TestSpectrum(unittest.TestCase): - """Test the spectrum module.""" - - def setUp(self): - """Initialize tests.""" - pass - - def test_db_to_pow_places(self): - """Test the db_to_pow conversion with multiple places.""" - test_val = 29.9460497 - exp_val = [988, 987.7, 987.65, 987.654, 987.6543] - - for i in range(0, len(exp_val)): - self.assertEqual(spectrum.db_to_pow(test_val, places=i), exp_val[i]) - - def test_db_to_pow_ndarray(self): - """Test db_to_pow with ndarray input.""" - test_val = np.array([30.0]) - self.assertEqual(spectrum.db_to_pow(test_val), np.array([1000.0])) - - def test_dbW(self): - """Test the dbW conversion with normal inputs.""" - test_val = 9.7197255 - exp_val = [10, 9.9, 9.88, 9.877, 9.8765] - - for i in range(0, len(exp_val)): - self.assertEqual(spectrum.dBW(test_val, places=i), exp_val[i]) - - def test_dbW_ndarray(self): - """Test dbW with ndarray input.""" - test_val = np.array([100.0]) - self.assertEqual(spectrum.dBW(test_val), np.array([20.0])) - - def test_enob(self): - """Test enob with muliple places.""" - test_val = 60.123456 - exp_val = [10, 9.7, 9.69, 9.695, 9.6949] - - for i in range(0, len(exp_val)): - self.assertEqual(spectrum.enob(test_val, places=i), exp_val[i]) - - @mock.patch("adc_eval.eval.spectrum.calc_psd") - def test_get_spectrum(self, mock_calc_psd): - """Test that the get_spectrum method returns power spectrum.""" - fs = 4 - nfft = 3 - data = np.array([1]) - exp_spectrum = np.array([fs / nfft]) - - mock_calc_psd.return_value = (None, data, None, data) - - self.assertEqual( - spectrum.get_spectrum(None, fs=fs, nfft=nfft), (None, exp_spectrum) - ) - - def test_sndr_sfdr_outputs(self): - """Test the sndr_sfdr method outputs.""" - data = np.array([1, 2, 91, 7]) - freq = np.array([100, 200, 300, 400]) - full_scale = -3 - nfft = 2**8 - fs = 1 - exp_return = { - "sig": { - "freq": 300, - "bin": 2, - "power": 91, - "dB": 19.6, - "dBFS": round(19.590 - full_scale, 1), - }, - "spur": { - "freq": 400, - "bin": 3, - "power": 7, - "dB": 8.5, - "dBFS": round(8.451 - full_scale, 1), - }, - "noise": { - "floor": 18 / nfft, - "power": 9, - "rms": 3, - "dBHz": round(-11.529675 - full_scale, 1), - "NSD": round(36.6351 - full_scale, 1), - }, - "sndr": { - "dBc": 10.0, - "dBFS": round(full_scale - 9.542, 1), - }, - "sfdr": { - "dBc": 11.1, - "dBFS": round(full_scale - 8.451, 1), - }, - "enob": { - "bits": round((full_scale - 11.3024) / 6.02, 1), - }, - } - - result = spectrum.sndr_sfdr(data, freq, fs, nfft, 0, full_scale=full_scale) - for key, val in exp_return.items(): - self.assertDictEqual(result[key], val, msg=key) diff --git a/tests/test_spectrum_plotting.py b/tests/test_spectrum_plotting.py deleted file mode 100644 index d3e5d2d..0000000 --- a/tests/test_spectrum_plotting.py +++ /dev/null @@ -1,136 +0,0 @@ -"""Test the spectrum plotting functions.""" - -import unittest -import numpy as np -from unittest import mock -from adc_eval.eval import spectrum - - -class TestSpectrumPlotting(unittest.TestCase): - """Test the spectrum module.""" - - def setUp(self): - """Initialize tests.""" - self.nlen = 2**16 - self.nfft = 2**12 - self.fs = 1 - self.bin = 13 - self.arms = 0.5 / np.sqrt(2) - self.fin = self.fs / self.nfft * self.bin - - def gen_spectrum(self, harmonics): - """Generate a wave with arbitrary harmonics.""" - t = 1 / self.fs * np.linspace(0, self.nlen - 1, self.nlen) - vin = np.zeros(len(t)) - for i in range(1, harmonics + 1): - vin += np.sqrt(2) * self.arms / i * np.sin(2 * np.pi * i * self.fin * t) - - return spectrum.get_spectrum(vin, fs=self.fs, nfft=self.nfft) - - def test_find_harmonics(self): - """Test the find harmonics method.""" - for i in range(2, 10): - (freq, pwr) = self.gen_spectrum(10) - - stats = spectrum.find_harmonics( - pwr, freq, self.nfft, self.bin, self.arms, harms=i, leak=0 - ) - - for x in range(2, i + 1): - msg_txt = f"harm={i}, index={x}" - self.assertEqual(stats["harm"][x]["bin"], x * self.bin, msg=msg_txt) - self.assertEqual( - round(stats["harm"][x]["power"], 4), - round((self.arms / x) ** 2, 4), - msg=msg_txt, - ) - self.assertEqual( - stats["harm"][x]["freq"], - round(freq[x * self.bin] / 1e6, 1), - msg=msg_txt, - ) - - def test_find_harmonics_with_leakage(self): - """Test the find harmonics method with spectral leakage.""" - self.bin = 13.5 - leakage_bins = 5 - for i in range(2, 10): - (freq, pwr) = self.gen_spectrum(10) - - stats = spectrum.find_harmonics( - pwr, freq, self.nfft, self.bin, self.arms, harms=i, leak=leakage_bins - ) - - for x in range(2, i + 1): - msg_txt = f"harm={i}, index={x}" - self.assertTrue( - x * self.bin - leakage_bins - <= stats["harm"][x]["bin"] - <= x * self.bin + leakage_bins, - msg=msg_txt, - ) - - def test_find_harmonics_with_leakage_outside_bounds(self): - """Test find harmonics with leakage bins exceeding array bounds.""" - self.bin = self.nfft / 4 - 0.5 - (freq, pwr) = self.gen_spectrum(5) - leakage_bins = 2 - stats = spectrum.find_harmonics( - pwr, freq, self.nfft, self.bin, self.arms, harms=2, leak=leakage_bins - ) - self.assertTrue(self.nfft / 2 - 3 <= stats["harm"][2]["bin"], self.nfft / 2 - 1) - - def test_find_harmonics_on_fft_bound(self): - """Test find harmonics with harmonics landing at nfft/2.""" - self.nfft = 2**12 - self.bin = self.nfft / 8 - (freq, pwr) = self.gen_spectrum(10) - leakage_bins = 0 - stats = spectrum.find_harmonics( - pwr, freq, self.nfft, self.bin, self.arms, harms=5, leak=leakage_bins - ) - self.assertEqual(stats["harm"][2]["bin"], 2 * self.bin) - self.assertEqual(stats["harm"][3]["bin"], 3 * self.bin) - self.assertEqual(stats["harm"][4]["bin"], 0) - self.assertEqual(stats["harm"][5]["bin"], self.nfft - 5 * self.bin) - - def test_plot_string(self): - """Test proper return of plotting string.""" - self.bin = 13 - (freq, pwr) = self.gen_spectrum(3) - stats = spectrum.sndr_sfdr(pwr, freq, 1, self.nfft, leak=0, full_scale=0) - harms = spectrum.find_harmonics( - pwr, freq, self.nfft, self.bin, self.arms, harms=3, leak=0 - ) - all_stats = {**stats, **harms} - - plt_str = spectrum.get_plot_string( - all_stats, 0, self.fs, self.nfft, window="rectangular" - ) - - # Check for important information, not everything - msg_txt = f"{all_stats}\n{plt_str}" - self.assertTrue(f"NFFT = {self.nfft}" in plt_str, msg=msg_txt) - self.assertTrue( - f"ENOB = {all_stats['enob']['bits']} bits" in plt_str, msg=msg_txt - ) - self.assertTrue( - f"SNDR = {all_stats['sndr']['dBFS']} dBFS" in plt_str, msg=msg_txt - ) - self.assertTrue( - f"SFDR = {all_stats['sfdr']['dBFS']} dBFS" in plt_str, msg=msg_txt - ) - self.assertTrue( - f"Noise Floor = {all_stats['noise']['dBHz']} dBFS" in plt_str, - msg=msg_txt, - ) - self.assertTrue( - f"HD2 = {round(all_stats['harm'][2]['dB'], 1)} dBFS @ {all_stats['harm'][2]['freq']} MHz" - in plt_str, - msg=msg_txt, - ) - self.assertTrue( - f"HD3 = {round(all_stats['harm'][3]['dB'], 1)} dBFS @ {all_stats['harm'][3]['freq']} MHz" - in plt_str, - msg=msg_txt, - )