-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
544 lines (492 loc) · 18.1 KB
/
main.py
File metadata and controls
544 lines (492 loc) · 18.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
import argparse
import importlib.util
import shutil
import signal
import subprocess
import sys
import time
from collections import deque
import numpy as np
import soundcard as sc
from pyfftw.interfaces.numpy_fft import rfft
import tui
pw_loopback = None
def log_band_volumes(data, freqs, num_bands, band_edges, max_ref):
"""Get logarythmic volume in dB for specified number of bands, from sound sample, with interpolation between bands"""
# get magnitude from fft
raw_magnitude = np.abs(rfft(data, threads=1))
# split into logarithmic bands
magnitude = np.zeros(num_bands)
for i in range(num_bands):
left = band_edges[i]
right = band_edges[i+1]
idx = np.where((freqs >= left) & (freqs < right))[0]
# interpolate between bands
if len(idx) == 0:
left_bin = np.searchsorted(freqs, left)
right_bin = np.searchsorted(freqs, right)
bins = []
if left_bin > 0:
bins.append(left_bin - 1)
if left_bin < len(freqs):
bins.append(left_bin)
if right_bin > 0 and right_bin != left_bin:
bins.append(right_bin - 1)
if right_bin < len(freqs) and right_bin != left_bin:
bins.append(right_bin)
bins = np.array(list(set(bins)))
weights = []
for b in bins:
center = freqs[b]
band_center = (left + right) / 2
d = abs(center - band_center) + 1e-6
weights.append(1/d)
weights = np.array(weights)
weights /= np.sum(weights)
magnitude[i] = np.sqrt(np.sum((raw_magnitude[bins]**2) * weights)) # weighted RMS
else:
magnitude[i] = np.sqrt(np.mean(raw_magnitude[idx]**2)) # RMS
# magnitude to negative dB
db = 20 * np.log10(magnitude / max_ref + 1e-12) # add small value to avoid log(0)
return np.maximum(db, -90)
def connect_pipewire(output_node_name, target_node_name=None, only_get_name=False):
"""Connect to output with custom loopback device. This prevents headsets from switching to 'handsfree' mode"""
global pw_loopback
# check if pipewire is running
if "pipewire" not in subprocess.check_output(["ps", "-A"], text=True):
sys.exit("Pipewire process not found")
# check if pipewire commands are available
if not (shutil.which("pw-link") or shutil.which("pw-loopback")):
sys.exit("pw-link and pw-loopback commands not found")
if target_node_name and ":" in target_node_name:
target_node_name = target_node_name.split(":")[0]
# find node that output is connected to
command = ["pw-link", "--links"]
proc = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
links = proc.communicate()[0].decode().split("\n")
last_nodes = []
for num, link in enumerate(links):
if target_node_name and target_node_name in link:
last_nodes.append(target_node_name)
break
if not target_node_name and f"|-> {output_node_name}" in link:
node_name = links[num-1].split(":")[0].strip()
if node_name not in last_nodes:
last_nodes.append(node_name)
if not last_nodes:
sys.exit("Could not find active pipewire links. Make sure audio is playing when starting spectroterm or specify custom node name.")
if only_get_name:
return last_nodes
# start loopback node
command = [
"pw-loopback",
"--capture-props", 'node.autoconnect=false node.name=spectroterm-capture node.description="Spectroterm Capture"',
"--playback-props", 'node.autoconnect=false media.class=Audio/Source node.name=spectroterm node.description="Spectroterm"',
]
pw_loopback = subprocess.Popen(
command,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
time.sleep(0.1) # delay for pw-loopback to create nodes
# link loopback node
for node in last_nodes:
proc = subprocess.Popen(
["pw-link", node, "spectroterm-capture"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return "spectroterm"
def db_to_height(db, min_db, max_db, bar_height):
"""Calculate height of bars from sound volume"""
return np.clip(np.round(np.interp(db, (min_db, max_db), (0, bar_height))).astype(np.int32), 0, bar_height)
def generate_log_x_axis(lines, num_bars, min_freq, max_freq):
"""Draw logarythmic Hz x axis"""
line = [" "] * num_bars
freqs = [30, 100, 200, 500, 1000, 2000, 5000, 10000, 16000]
band_edges = np.logspace(np.log10(min_freq), np.log10(max_freq), num_bars + 1)
for freq in freqs:
if band_edges[0] < freq < band_edges[-1]:
pos = np.argmin(np.abs(band_edges - freq))
if 0 <= pos < num_bars:
if freq >= 1000:
label = f"{round(freq/1000)}k"
else:
label = str(round(freq))
if pos < num_bars - 5:
for i, ch in enumerate(label):
line[pos + i] = ch
line = "".join(line[:-2]) + "Hz"
lines[-1] += line
return lines
def generate_log_y_axis(lines, bar_height, min_db, max_db):
"""Draw logarythmic dB y axis"""
levels = list(range(int(min_db), int(max_db) + 1, 10))
added = []
label_len = 0
for db in levels:
# get y coordinate
pos = int(np.interp(db, (min_db, max_db), (bar_height, 0)))
label = str(db).rjust(3)
if 0 < pos < bar_height:
lines[pos] += label
added.append(pos)
label_len = len(label)
for num, line in enumerate(lines):
if num not in added and num != 0:
lines[num] += label_len * " "
lines[0] += " dB"
return lines
def generate_ui(draw_box, draw_axes, min_freq, max_freq, min_db, max_db, h, w):
"""Draw UI"""
bar_height = h - draw_box - draw_axes
num_bars = w - 2 * draw_box - 3 * draw_axes
if draw_box:
top_line = "┌─Spectrum Analyzer" + "─" * (w - 20) + "┐"
bot_line = "└" + "─" * (w - 2) + "┘"
left_lines = ["│"] * (h - 2)
right_lines = ["│"] * (h - 2)
else:
top_line = ""
bot_line = ""
left_lines = [""] * h
right_lines = [""] * h
if draw_axes:
left_lines = generate_log_y_axis(left_lines, bar_height, min_db, max_db)
left_lines = generate_log_x_axis(left_lines, num_bars, min_freq, max_freq)
left_lines = [top_line] + left_lines + [bot_line]
return left_lines, right_lines, bar_height, num_bars
def generate_spectrum(left_lines, right_lines, bar_heights, peak_heights, bar_height, bar_character, peak_character, peaks, box, axes, colors):
"""Draw spectrum bars with peaks"""
lines = []
if box:
lines.append(left_lines[0])
width = bar_heights.shape[0]
for y_raw in range(bar_height - box):
y = y_raw + box
line = [" "] * width
for i in range(width):
bar = bar_heights[i]
if y_raw >= bar_height - bar:
line[i] = bar_character
if peaks and y_raw == bar_height - peak_heights[i]:
line[i] = peak_character
if colors:
relative = (bar_height - y_raw) / bar_height
if relative < 0.5:
color = colors[0]
elif relative < 0.8:
color = colors[1]
else:
color = colors[2]
lines.append(left_lines[y] + f"\x1b[38;5;{color}m" + "".join(line) + "\x1b[0m" + right_lines[y_raw])
else:
lines.append(left_lines[y] + "".join(line) + right_lines[y_raw])
if axes:
lines.append(left_lines[-2] + right_lines[-1])
if box:
lines.append(left_lines[-1])
return lines
# use cython if available
if importlib.util.find_spec("spectrum_cython"):
from spectrum_cython import generate_spectrum, log_band_volumes
def main(args):
"""Main app function"""
# load config
if args.color:
colors = (args.green, args.orange,args.red)
else:
colors = None
box = args.box
axes = args.axes
peaks = args.peaks
fall_speed = args.fall_speed
bar_character = args.bar_character[0]
peak_character = args.peak_character[0]
sample_rate = args.sample_rate
sample_size = args.sample_size / 1000
reference_max = args.reference_max
peak_hold = args.peak_hold / 1000
min_freq = args.min_freq
max_freq = args.max_freq
min_db = args.min_db
max_db = args.max_db
pipewire_fix = args.pipewire_fix
pipewire_node_id = args.pipewire_node_id
delay = args.delay
# detect bluetooth device
if args.bt_delay:
if "blue" in sc.default_speaker().id:
delay = args.bt_delay
prev_bar_heights = None
prev_update_time = time.perf_counter()
peak_heights = np.array([], dtype="int32")
numframes = int(sample_rate * sample_size)
delay_frames = int(sample_rate * delay / 1000)
freqs = np.fft.rfftfreq(numframes, 1 / sample_rate)
# get loopback device
if pipewire_fix:
mic_id = connect_pipewire(sc.default_speaker().id, pipewire_node_id)
if not mic_id:
mic_id = sc.default_speaker().name
else:
mic_id = sc.default_speaker().name
loopback_mic = sc.get_microphone(mic_id, include_loopback=True)
try:
with loopback_mic.recorder(samplerate=sample_rate, channels=1, blocksize=numframes) as rec:
if tui.resized:
h, w = tui.get_size()
tui.resized = False
left_lines, right_lines, bar_height, num_bars = generate_ui(box, axes, min_freq, max_freq, min_db, max_db, h, w)
band_edges = np.logspace(np.log10(min_freq), np.log10(max_freq), num_bars + 1)
silence = np.repeat(-90.0, num_bars)
silence_time = 0
max_silence_time = max(peak_hold, h/fall_speed) * 2 * 1000
if delay:
buffer = deque()
buffer.extend(np.array_split(np.zeros((delay_frames)), delay_frames // numframes))
while True:
# handle input
# key = tui.read_key()
# if key == 113:
# break
if tui.resized:
h, w = tui.get_size()
tui.resized = False
left_lines, right_lines, bar_height, num_bars = generate_ui(box, axes, min_freq, max_freq, min_db, max_db, h, w)
band_edges = np.logspace(np.log10(min_freq), np.log10(max_freq), num_bars + 1)
silence = np.repeat(-90, num_bars)
# get and process data
if delay:
buffer.append(rec.record(numframes=numframes).flatten())
data = buffer.popleft()
else:
data = rec.record(numframes=numframes).flatten()
# skip calculations if all data is zero
if data.any():
db = log_band_volumes(data, freqs, num_bars, band_edges, reference_max)
silence_time = 0
else:
if silence_time >= max_silence_time:
continue
silence_time += int(args.sample_size)
db = silence
# calculate heights on screen
raw_bar_heights = db_to_height(db, min_db, max_db, bar_height)
# falling bars
now = time.perf_counter()
dt = now - prev_update_time
prev_update_time = now
if prev_bar_heights is None or len(prev_bar_heights) != len(raw_bar_heights):
prev_bar_heights = raw_bar_heights.copy()
else:
max_fall = int(fall_speed * dt)
for i in range(len(raw_bar_heights)):
if raw_bar_heights[i] >= prev_bar_heights[i]:
prev_bar_heights[i] = raw_bar_heights[i]
else:
prev_bar_heights[i] = max(raw_bar_heights[i], prev_bar_heights[i] - max_fall)
bar_heights = prev_bar_heights
# peak marker
if peaks:
if len(peak_heights) != len(bar_heights):
peak_heights = bar_heights.copy()
peak_times = [now] * len(bar_heights)
for i, bh in enumerate(bar_heights):
if bh >= peak_heights[i] - 1:
peak_heights[i] = bh + 1
peak_times[i] = now
elif now - peak_times[i] > peak_hold:
peak_heights[i] = bh
peak_times[i] = now
# draw spectrum
lines = generate_spectrum(left_lines, right_lines, bar_heights, peak_heights, bar_height, bar_character, peak_character, peaks, box, axes, colors)
tui.draw(lines)
except Exception:
if pw_loopback:
pw_loopback.send_signal(signal.SIGINT)
pw_loopback.wait()
tui.leave_raw()
import traceback
sys.exit(f"Error: {traceback.format_exc()}")
def sigint_handler(signum, frame): # noqa
"""Handle Ctrl-C event"""
if pw_loopback:
pw_loopback.send_signal(signal.SIGINT)
pw_loopback.wait()
tui.leave_raw()
sys.exit(0)
def argparser():
"""Setup argument parser for CLI"""
parser = argparse.ArgumentParser(
prog="spectroterm",
description="Curses based terminal spectrum analyzer for currently playing audio",
)
parser._positionals.title = "arguments"
parser.add_argument(
"-a",
"--axes",
action="store_true",
help="draw graph axes",
)
parser.add_argument(
"-b",
"--box",
action="store_true",
help="draw lines at terminal borders",
)
parser.add_argument(
"-c",
"--color",
action="store_true",
help="3 color mode",
)
parser.add_argument(
"-p",
"--peaks",
action="store_true",
help="draw peaks that disappear after some time",
)
parser.add_argument(
"-f",
"--fall-speed",
type=int,
default=40,
help="speed at which bars fall in characters per second",
)
parser.add_argument(
"-o",
"--peak-hold",
type=int,
default=2000,
help="time after which peak will dissapear, in ms",
)
parser.add_argument(
"-r",
"--bar-character",
type=str,
default="█",
help="character used to draw bars",
)
parser.add_argument(
"-k",
"--peak-character",
type=str,
default="_",
help="character used to draw peaks",
)
parser.add_argument(
"--min-freq",
type=int,
default=30,
help="minimum frequency on spectrum graph (x-axis)",
)
parser.add_argument(
"--max-freq",
type=int,
default=16000,
help="maximum frequency on spectrum graph (x-axis)",
)
parser.add_argument(
"--min-db",
type=int,
default=-90,
help="minimum loudness on spectrum graph (y-axis)",
)
parser.add_argument(
"--max-db",
type=int,
default=0,
help="maximum loudness on spectrum graph (y-axis)",
)
parser.add_argument(
"--green",
type=int,
default=46,
help="8bit ANSI color code for green part of bar",
)
parser.add_argument(
"--orange",
type=int,
default=214,
help="8bit ANSI color code for orange part of bar",
)
parser.add_argument(
"--red",
type=int,
default=196,
help="8bit ANSI color code for red part of bar",
)
parser.add_argument(
"--delay",
type=int,
default=0,
help="spectrogram delay for a better sync with sound.",
)
parser.add_argument(
"--bt-delay",
type=int,
default=0,
help="spectrogram delay for auto-detected bluetooth devices.",
)
parser.add_argument(
"--sample-rate",
type=int,
default=44100,
help="loopback device sample rate",
)
parser.add_argument(
"--sample-size",
type=int,
default=50,
help="sample size in ms, higher values will decrease fps",
)
parser.add_argument(
"--reference-max",
type=int,
default=3000,
help="value used to tune maximum loudness of sound",
)
parser.add_argument(
"--pipewire-fix",
action="store_true",
help="pipewire only, connect to output with custom loopback device. This prevents headsets from switching to 'handsfree' mode, which is mono and has lower audio quality. Sometimes this wont work unless sound is playing",
)
parser.add_argument(
"--print-pipewire-node",
action="store_true",
help="will print all currently used pipewire nodes to monitor sound, then exit",
)
parser.add_argument(
"--pipewire-node-id",
type=str,
default=None,
help="ID of custom pipewire node to use. Set this to preferred node if spectroterm is launched before any soud is reproduced. Effective only whith --pipewire-fix. Use 'pw-list -o' to get list of available nodes, or use --print-pipewire-node",
)
parser.add_argument(
"-v",
"--version",
action="version",
version="%(prog)s 0.6.1",
)
return parser.parse_args()
if __name__ == "__main__":
args = argparser()
signal.signal(signal.SIGINT, sigint_handler)
if args.print_pipewire_node:
last_nodes = connect_pipewire(sc.default_speaker().id, only_get_name=True)
for node in last_nodes:
print(node)
sys.exit()
tui.enter_raw()
try:
main(args)
except Exception:
tui.leave_raw()
import traceback
sys.exit(f"Error: {traceback.format_exc()}")
tui.leave_raw()