Skip to content

Commit e0c4c02

Browse files
committed
tests: Verify waveform sessions
The number of samples needs to be kept down to avoid overflowing the buffer.
1 parent 24b9fbe commit e0c4c02

1 file changed

Lines changed: 77 additions & 0 deletions

File tree

tests/test_signals.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import pytest # type: ignore
88

99
import nixnet
10+
from nixnet import _ctypedefs
11+
from nixnet._session import signals as session_signals
1012

1113

1214
@pytest.fixture
@@ -86,3 +88,78 @@ def test_singlepoint_loopback(nixnet_in_interface, nixnet_out_interface):
8688
actual_signals = list(input_session.signals.read())
8789
for expected, (_, actual) in zip(expected_signals, actual_signals):
8890
assert pytest.approx(expected, rel=1) == actual
91+
92+
93+
def test_waveform_unflatten_empty():
94+
signals = session_signals.WaveformInSignals._unflatten_signals([], 0, 0)
95+
assert len(signals) == 0
96+
97+
98+
def test_waveform_unflatten_single_signal():
99+
signals = session_signals.WaveformInSignals._unflatten_signals(
100+
[_ctypedefs.f64(v) for v in [0, 1, 2, 3]],
101+
4,
102+
1)
103+
assert signals == [[0, 1, 2, 3]]
104+
105+
106+
def test_waveform_unflatten_multi_signal():
107+
signals = session_signals.WaveformInSignals._unflatten_signals(
108+
[_ctypedefs.f64(v) for v in [0, 1, 2, 3]],
109+
2,
110+
2)
111+
assert signals == [[0, 1], [2, 3]]
112+
113+
114+
def generate_ramp(min, max, rate, length):
115+
"""Generate ramp for test data
116+
117+
Args:
118+
min(float): minimum value
119+
max(float): maximum value
120+
rate(float): rate data is transmitted (Hz)
121+
length(float): Duration of ramp (secs)
122+
"""
123+
samples = int(rate * length)
124+
step = (max - min) / samples
125+
return [
126+
min + step * i
127+
for i in range(samples)
128+
]
129+
130+
131+
@pytest.mark.integration
132+
def test_waveform_loopback(nixnet_in_interface, nixnet_out_interface):
133+
database_name = 'NIXNET_example'
134+
cluster_name = 'CAN_Cluster'
135+
signal_names = 'CANEventSignal1'
136+
137+
with nixnet.SignalInWaveformSession(
138+
nixnet_in_interface,
139+
database_name,
140+
cluster_name,
141+
signal_names) as input_session:
142+
with nixnet.SignalOutWaveformSession(
143+
nixnet_out_interface,
144+
database_name,
145+
cluster_name,
146+
signal_names) as output_session:
147+
output_session.signals.resamp_rate = 1000
148+
input_session.signals.resamp_rate = output_session.signals.resamp_rate / 2
149+
# Start the input session manually to make sure that the first
150+
# frame value sent before the initial read will be received.
151+
input_session.start()
152+
153+
signal_ramp = generate_ramp(4, 11, output_session.signals.resamp_rate, 0.25)
154+
output_session.signals.write([signal_ramp])
155+
156+
# Wait 1 s and then read the received values.
157+
# They should be the same as the ones sent.
158+
time.sleep(1)
159+
160+
t0, dt, waveforms = input_session.signals.read(len(signal_ramp))
161+
print(t0)
162+
assert pytest.approx(dt, 1 / input_session.signals.resamp_rate)
163+
assert len(waveforms) == 1
164+
for expected, actual in zip(signal_ramp, waveforms[0]):
165+
assert pytest.approx(expected, rel=0.1) == actual

0 commit comments

Comments
 (0)