-
Notifications
You must be signed in to change notification settings - Fork 219
Expand file tree
/
Copy pathcore.py
More file actions
208 lines (159 loc) · 6.14 KB
/
core.py
File metadata and controls
208 lines (159 loc) · 6.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
from enum import Enum
import re
import shlex
import subprocess
from enum import Enum
from system import run_and_check, CommandValidationException
PREFFERED_PROFILE = 'headset-head-unit'
class BluezAddressType(Enum):
BR_EDR = 0
LE_PUBLIC = 1
LE_RANDOM = 2
def __str__(self):
return self.name
def is_valid_bluezaddress(address: str) -> bool:
ok = True
try:
Address(address)
except ValueError:
ok = False
return ok
class Address:
regexp = re.compile(r"(?i:^([\da-f]{2}:){5}[\da-f]{2}$)")
def __init__(self, value: str):
if self.regexp.match(value) is None:
raise ValueError(f"{value} is not a valid bluetooth address")
self._address = value.lower()
def __str__(self):
return self._address
def __eq__(self, other):
return self._address == str(other).lower()
class BluezTarget:
regexp = re.compile(r"(?i:^([\da-f]{2}:){5}[\da-f]{2}$)")
def __init__(
self, address: str, type: int | BluezAddressType = BluezAddressType.BR_EDR
):
self.address = Address(address)
if isinstance(type, int):
type = BluezAddressType(type)
elif isinstance(type, str):
type = BluezAddressType(int(type))
self.type = type
def __eq__(self, other):
return self.address == other.address and self.type == other.type
class BluezIoCaps(Enum):
DisplayOnly = 0
DisplayYesNo = 1
KeyboardOnly = 2
NoInputNoOutput = 3
KeyboardDisplay = 4
def pair(target: BluezTarget, verbose: bool = False) -> bool:
# Configure ourselves to be bondable and pairable
run_and_check(shlex.split("sudo btmgmt bondable true"), verbose=verbose)
run_and_check(shlex.split("sudo btmgmt pairable true"), verbose=verbose)
# No need for link security ;)
run_and_check(shlex.split("sudo btmgmt linksec false"), verbose=verbose)
# Try to pair to a device with NoInputNoOutput capabilities
# TODO: Sometimes this may fail due to agent requesting user confirmation.
# Registering the following agent may help: "yes | bt-agent -c NoInputNoOutput"
try:
run_and_check(
shlex.split(
f"sudo btmgmt pair -c {str(BluezIoCaps.NoInputNoOutput.value)} -t {str(target.type.value)} {str(target.address)}"
),
is_valid=lambda out: not ("failed" in out and not "Already Paired" in out),
verbose=verbose,
)
return True
except CommandValidationException as e:
if "status 0x05 (Authentication Failed)" in e.output:
return False
raise e
def connect(target: BluezTarget, timeout: int = 2, verbose: bool = False):
run_and_check(
shlex.split(f"bluetoothctl --timeout {str(timeout)} scan on"), verbose=verbose
)
run_and_check(
shlex.split(f"bluetoothctl connect {str(target.address)}"),
is_valid=lambda out: not "Failed to connect" in out,
verbose=verbose
)
def normalize_address(target: BluezTarget) -> str:
return str(target.address).upper().replace(":", "_")
def to_card_name(target: BluezTarget) -> str:
return "bluez_card." + normalize_address(target=target)
def to_source_name(target: BluezTarget) -> str:
return "bluez_input." + normalize_address(target=target) + ".0"
def get_bluetooth_profile(card_name: str, verbose: bool = False) -> str:
result = subprocess.run(
['pactl', 'list', 'cards'],
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
raise CommandValidationException(
"pactl list cards",
result.stderr or "pactl command failed"
)
output = result.stdout
card_section = []
in_our_card = False
for line in output.split('\n'):
if f"Name: {card_name}" in line:
in_our_card = True
card_section = []
elif in_our_card:
if line.startswith('Card #') or (line.startswith('\t\tName:') and line.strip() != f"Name: {card_name}"):
break
card_section.append(line)
if not card_section:
raise CommandValidationException(
f"pactl list cards (search for {card_name})",
f"Card {card_name} not found in pactl output"
)
profiles = []
in_profiles_section = False
for line in card_section:
if 'Profiles:' in line:
in_profiles_section = True
continue
if in_profiles_section:
if line.startswith('\t\t') and ':' in line:
match = re.match(r'\t\t([^:]+):', line)
if match:
profile_name = match.group(1).strip()
line_lower = line.lower()
is_available = 'available: yes' in line_lower or 'available: unknown' in line_lower
has_sources = 'sources: 0' not in line_lower
if is_available and has_sources:
profiles.append(profile_name)
if profile_name == PREFFERED_PROFILE:
if verbose:
print(f"Using preferred profile: {profile_name}")
return profile_name
elif not line.startswith('\t\t'):
break
if not profiles:
raise CommandValidationException(
f"pactl list cards (get profiles for {card_name})",
f"No available profiles found for card {card_name}"
)
if verbose:
print(f"Using fallback profile: {profiles[0]} (available: {', '.join(profiles)})")
return profiles[0]
def record(target: BluezTarget, outfile: str, verbose: bool = True):
source_name = to_source_name(target)
card_name = to_card_name(target)
run_and_check(
shlex.split(f"pactl set-card-profile {card_name} {get_bluetooth_profile(card_name, verbose)}"),
verbose=verbose,
)
try:
run_and_check(["parecord", "-d", source_name, outfile], verbose=verbose)
except KeyboardInterrupt:
pass
except:
raise
def playback(sink: str, file: str, verbose: bool = True):
run_and_check(["paplay", "-d", sink, file], verbose=verbose)