-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmainThread.py
More file actions
246 lines (185 loc) · 7 KB
/
mainThread.py
File metadata and controls
246 lines (185 loc) · 7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""
@file mainThread.py
@author Riccardo Cavallari
@date 2019.10.14
Simulation of a system composed of
- a producer of audio packets
- a lossy medium
- a consumer of audio packet
The producer generates packets with period T_in; packets are transmitted to
on the medium with probability P and with period T_m; consumer reproduces packets
with period T_out. All three timers are subject to precisioin error and are not
alligned.
Goals of the simulation are, evaluate latency and buffer overflow/underflow
at the consumer.
Next steps:
- add asynchronous sampling rate conversion
- add channel model
- add GUI for plotting and playing sound
"""
"""
Impossible to achieve enough accuracy for timers in Windows.
https://stackoverflow.com/questions/7079864/real-time-operating-via-python
"""
import threading
import time
import packet
import random
import wave
from array import array
from collections import deque
from vcd import VCDWriter
PROB_THRESHOLD = 2
FRAMES_PER_PACKET = 160 # mono, 10 ms @ 16 kHz sampl. rate
FRAME_INTERVAL = 10e-3
TRANSPORT_INTERVAL = 7.5e-3 #FRAME_INTERVAL
IN_FIFO_LEN = 10
OUT_FIFO_LEN = IN_FIFO_LEN
RADIO_FIFO_LEN = 10
# packet + ack + 2*Tifs
# TODO: use real numbers and dependent on packet len and PHY
PACKET_DURATION = (800e-6 + 300e-6)
CONNECTION_EVENT = 0.75 * TRANSPORT_INTERVAL
MAX_TX_CONN_EVENT = int(CONNECTION_EVENT/PACKET_DURATION)
underflowCnt = 0
overflowCnt = 0
radioOverflow = 0
vcdFile = open("out.vcd", "w")
vcd = VCDWriter(vcdFile, timescale='1 ns')
consWire = vcd.register_var('libsigrok', 'consumer', 'wire', size=1, ident='!')
bleWire = vcd.register_var('libsigrok', 'BLE', 'wire', size=1, ident='$')
start = time.perf_counter_ns()
# FIFO to hold packet ready to bt tx-ed or retx-ed
fifoRadio = deque([], RADIO_FIFO_LEN)
def createPacket(payload):
""" Create a packet with a given payload """
pkt = packet.Packet(payload)
return pkt
def producerCallback(fifo, wf, e):
""" read frames from the wav input file and add it to the input FIFO.
This represents the output of the codec """
# add packets to the input FIFO...
payload = wf.readframes(FRAMES_PER_PACKET)
# ... until the wav file is not over
while (payload != b''):
packet = createPacket(payload)
fifo.append(packet)
# print("Producer", packet.seqNum)
time.sleep(FRAME_INTERVAL)
payload = wf.readframes(FRAMES_PER_PACKET)
# signal to the other threads that we are done
e.set()
def consumerCallback(fifo, file, e):
""" Get a packet from output FIFO and reproduce it """
global underflowCnt
while True:
try:
packet = fifo.popleft()
#print("Consumer", packet.seqNum)
# TODO: don't write a file, store payloads in some data structure
# and use a paudio callback to retrieve data from that data
# structure and reproduce it.
file.writeframes(packet.payload)
except IndexError: # no packet in the FIFO!
if e.is_set():
return
file.writeframes(bytes(2*FRAMES_PER_PACKET)) # write 0s
underflowCnt += 1
print("Output FIFO is empty, consumer underflow!")
end = time.perf_counter_ns()
timestamp = end - start
vcd.change(consWire, timestamp, 0)
vcd.change(consWire, timestamp + 1000, 1)
vcd.change(consWire, timestamp + 2000, 0)
time.sleep(FRAME_INTERVAL)
def aclTransportCallback(fifoIn, fifoOut, e):
""" Simulate an ACL tranport between a master and a slave. Packets are
retransmitted until they are successfully acknowledged. Packets are taken
from fifoIn and put in fifoOut if successfully transmitted. """
global fifoRadio, radioOverflow
while True:
try:
# get the packet scheduled for this event
schedPacket = fifoIn.popleft()
schedPacket.txAttemps = packet.FLUSH_TIMEOUT_INF
if fifoRadio.maxlen == len(fifoRadio):
radioOverflow += 1
fifoRadio.append(schedPacket)
except IndexError:
#print("Input FIFO is empty!")
1==1
# schedule packet transmission slots for this connection event
for _ in range(min(len(fifoRadio), MAX_TX_CONN_EVENT)):
txCallback(fifoRadio, fifoOut)
#time.sleep(PACKET_DURATION)
if e.is_set():
return
end = time.perf_counter_ns()
timestamp = end - start
vcd.change(bleWire, timestamp, 0)
vcd.change(bleWire, timestamp + 1000, 1)
vcd.change(bleWire, timestamp + 2000, 0)
# tis sould sleep less tan TRANSPORT_INTERVAL because it needs to
# account for te execution time of te txCallback. Tis is wy we ave
# to comment #time.sleep(PACKET_DURATION).
time.sleep(TRANSPORT_INTERVAL)
def txCallback(fifoIn, fifoOut):
""" Get a packet from the input FIFO and send it over the air. The packet
is successfully transmitted with probability PROB_THRESHOLD """
global overflowCnt
try:
packet = fifoIn.popleft()
if (random.random() < PROB_THRESHOLD):
if fifoOut.maxlen == len(fifoOut):
overflowCnt += 1
#print("Packet", packet.seqNum, "OK, re-tx", packet.txAttemps)
fifoOut.append(packet)
else:
#print("Packet", packet.seqNum, "Error")
# decrease txAttempts and put the packet back in the FIFO
packet.txAttemps -= 1
if (packet.txAttemps):
fifoIn.appendleft(packet)
else:
del packet
except IndexError:
print("Radio FIFO is empty!")
def main():
""" Main function of the simulator """
# input and output FIFOs
fifoIn = deque([], IN_FIFO_LEN)
fifoOut = deque([], OUT_FIFO_LEN)
# input wave file
wf = wave.open('440Hz.wav', 'rb')
# output wave file
wof = wave.open('output.wav', 'wb')
wof.setnchannels(1)
wof.setsampwidth(2)
wof.setframerate(16000)
# event to signal the consumer to stop consuming
e = threading.Event()
# Producer thread
producer = threading.Thread(target=producerCallback,
args=(fifoIn, wf, e))
# Bluetooth thread
bluetooth = threading.Thread(target=aclTransportCallback,
args=(fifoIn, fifoOut, e))
# Consumer thread
consumer = threading.Thread(target=consumerCallback,
args=(fifoOut, wof, e))
producer.start()
bluetooth.start()
time.sleep((OUT_FIFO_LEN/2)*FRAME_INTERVAL) # wait until the fifo is full
consumer.start()
# wait until all threads terminate
producer.join()
bluetooth.join()
consumer.join()
print("Consumer underflow:", underflowCnt)
print("Consumer overflow:", overflowCnt)
print("Radio overflow:", radioOverflow)
wof.close()
vcd.close()
vcdFile.close()
if __name__ == '__main__':
main()