-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathplayer.py
More file actions
348 lines (319 loc) · 13.6 KB
/
player.py
File metadata and controls
348 lines (319 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
import requests
import xml.etree.ElementTree as ET
import time
import threading
import logging
from logging.handlers import RotatingFileHandler
from dataclasses import dataclass
from zeroconf import ServiceBrowser, ServiceListener, Zeroconf
from typing import List, Dict, Tuple, Optional, Union
from dataclasses import dataclass, field
import os
import xml.etree.ElementTree as ET
# Ensure logs directory exists
os.makedirs('logs', exist_ok=True)
# Set up logging
log_file = 'logs/cli.log'
log_handler = RotatingFileHandler(log_file, maxBytes=1024*1024, backupCount=1)
log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
log_handler.setFormatter(log_formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(log_handler)
@dataclass
class PlayerStatus:
etag: str = ''
album: str = ''
artist: str = ''
name: str = ''
state: str = ''
volume: int = 0
service: str = ''
inputId: str = ''
can_move_playback: bool = False
can_seek: bool = False
cursor: int = 0
db: float = 0.0
fn: str = ''
image: str = ''
indexing: int = 0
mid: int = 0
mode: int = 0
mute: bool = False
pid: int = 0
prid: int = 0
quality: int = 0
repeat: int = 0
service_icon: str = ''
service_name: str = ''
shuffle: bool = False
sid: int = 0
sleep: str = ''
song: int = 0
stream_format: str = ''
sync_stat: int = 0
title1: str = ''
title2: str = ''
title3: str = ''
totlen: int = 0
secs: int = 0
@dataclass
class PlayerSource:
text: str
image: str
browse_key: Optional[str]
play_url: Optional[str]
input_type: Optional[str]
type: str
search_key: Optional[str] = None
children: List['PlayerSource'] = field(default_factory=list)
class BlusoundPlayer:
def __init__(self, host_name, name):
self.host_name = host_name
self.name = name
self.base_url = f"http://{self.host_name}:11000"
self.sources: List[PlayerSource] = []
logger.info(f"Initialized BlusoundPlayer: {self.name} at {self.host_name}")
self.initialize_sources()
def request(self, url: str, params: Optional[Dict] = None) -> requests.Response:
full_url = f"{self.base_url}{url}"
logger.info(f"Sending request to: {full_url}")
logger.info(f"Request params: {params}")
response = requests.get(full_url, params=params)
logger.info(f"Response status code: {response.status_code}")
logger.info(f"Response content: {response.text}")
response.raise_for_status()
return response
def capture_sources(self, browse_key: Optional[str] = None) -> List[PlayerSource]:
url = "/Browse"
params = {"key": browse_key} if browse_key else None
try:
response = self.request(url, params)
root = ET.fromstring(response.text)
sources = []
# Capture the search_key from the <browse> element
browse_search_key = root.get('searchKey')
for item in root.findall('item'):
source = PlayerSource(
text=item.get('text', ''),
image=item.get('image', ''),
browse_key=item.get('browseKey'),
play_url=item.get('playURL'),
input_type=item.get('inputType'),
type=item.get('type', ''),
search_key=browse_search_key # Add the search_key to each source
)
sources.append(source)
logger.info(f"Captured {len(sources)} sources for {self.name}")
return sources
except requests.RequestException as e:
logger.error(f"Error capturing sources for {self.name}: {str(e)}")
return []
def get_nested_sources(self, source: PlayerSource) -> None:
if source.browse_key:
nested_sources = self.capture_sources(source.browse_key)
if nested_sources:
source.children = nested_sources
else:
logger.warning(f"No nested sources found for {source.text}")
def initialize_sources(self) -> None:
max_retries = 3
retry_delay = 1 # seconds
for attempt in range(max_retries):
self.sources = self.capture_sources()
if self.sources:
logger.info(f"Initialized {len(self.sources)} sources for {self.name} after {attempt + 1} attempt(s).")
return
logger.warning(f"No sources found for {self.name}. Attempt {attempt + 1}/{max_retries}. Retrying in {retry_delay}s...")
time.sleep(retry_delay)
logger.error(f"Failed to initialize sources for {self.name} after {max_retries} attempts.")
# self.sources will remain empty if all retries fail
def get_status(self, timeout: Optional[int] = None, etag: Optional[str] = None) -> Tuple[bool, Union[PlayerStatus, str]]:
url = "/Status"
params = {}
if timeout:
params['timeout'] = timeout
if etag:
params['etag'] = etag
logger.debug(f"Getting status for {self.name}")
try:
response = self.request(url, params)
root = ET.fromstring(response.text)
def safe_find(element, tag, default=''):
found = element.find(tag)
return found.text if found is not None else default
def safe_int(value, default=0):
try:
return int(value)
except (ValueError, TypeError):
return default
status = PlayerStatus(
etag=root.get('etag', ''),
album=safe_find(root, 'album'),
artist=safe_find(root, 'artist'),
name=safe_find(root, 'title1'),
state=safe_find(root, 'state'),
volume=safe_int(safe_find(root, 'volume')),
service=safe_find(root, 'service'),
inputId=safe_find(root, 'inputId'),
can_move_playback=safe_find(root, 'canMovePlayback') == 'true',
can_seek=safe_int(safe_find(root, 'canSeek')) == 1,
cursor=safe_int(safe_find(root, 'cursor')),
db=float(safe_find(root, 'db', '0')),
fn=safe_find(root, 'fn'),
image=safe_find(root, 'image'),
indexing=safe_int(safe_find(root, 'indexing')),
mid=safe_int(safe_find(root, 'mid')),
mode=safe_int(safe_find(root, 'mode')),
mute=safe_int(safe_find(root, 'mute')) == 1,
pid=safe_int(safe_find(root, 'pid')),
prid=safe_int(safe_find(root, 'prid')),
quality=safe_int(safe_find(root, 'quality')),
repeat=safe_int(safe_find(root, 'repeat')),
service_icon=safe_find(root, 'serviceIcon'),
service_name=safe_find(root, 'serviceName'),
shuffle=safe_int(safe_find(root, 'shuffle')) == 1,
sid=safe_int(safe_find(root, 'sid')),
sleep=safe_find(root, 'sleep'),
song=safe_int(safe_find(root, 'song')),
stream_format=safe_find(root, 'streamFormat'),
sync_stat=safe_int(safe_find(root, 'syncStat')),
title1=safe_find(root, 'title1'),
title2=safe_find(root, 'title2'),
title3=safe_find(root, 'title3'),
totlen=safe_int(safe_find(root, 'totlen')),
secs=safe_int(safe_find(root, 'secs'))
)
logger.info(f"Status for {self.name}: {status}")
return True, status
except requests.RequestException as e:
logger.error(f"Error getting status for {self.name}: {str(e)}")
return False, str(e)
def set_volume(self, volume: int) -> Tuple[bool, str]:
url = "/Volume"
params = {'level': volume}
logger.info(f"Setting volume for {self.name} to {volume}")
try:
self.request(url, params)
return True, "Volume set successfully"
except requests.RequestException as e:
logger.error(f"Error setting volume for {self.name}: {str(e)}")
return False, str(e)
def toggle_play_pause(self) -> Tuple[bool, str]:
url = "/Pause"
params = {'toggle': 1}
logger.info(f"Toggling play/pause for {self.name}")
try:
self.request(url, params)
return True, "Playback toggled successfully"
except requests.RequestException as e:
logger.error(f"Error toggling play/pause for {self.name}: {str(e)}")
return False, str(e)
def skip(self) -> Tuple[bool, str]:
url = "/Skip"
logger.info(f"Skipping track on {self.name}")
try:
self.request(url)
return True, "Skipped to next track successfully"
except requests.RequestException as e:
logger.error(f"Error skipping track on {self.name}: {str(e)}")
return False, str(e)
def back(self) -> Tuple[bool, str]:
url = "/Back"
logger.info(f"Going back a track on {self.name}")
try:
self.request(url)
return True, "Went back to previous track successfully"
except requests.RequestException as e:
logger.error(f"Error going back a track on {self.name}: {str(e)}")
return False, str(e)
def select_input(self, source: PlayerSource) -> Tuple[bool, str]:
if source.play_url:
url = source.play_url
elif source.browse_key:
url = "/Browse"
params = {'key': source.browse_key}
else:
return False, "Invalid source"
logger.info(f"Selecting source for {self.name}: {source.text}")
try:
self.request(url, params if source.browse_key else None)
return True, f"{source.text} selected successfully"
except requests.RequestException as e:
logger.error(f"Error selecting source for {self.name}: {str(e)}")
return False, str(e)
def search(self, search_key: str, search_string: str) -> List[PlayerSource]:
url = "/Browse"
params = {'key': search_key, 'q': search_string}
logger.info(f"Searching for '{search_string}' with key '{search_key}' on {self.name}")
try:
response = self.request(url, params)
root = ET.fromstring(response.text)
sources = []
for item in root.findall('item'):
text = item.get('text', '').strip()
browse_key = item.get('browseKey')
if text == "Library" and browse_key:
# Follow up with a second request for Library search
library_response = self.request(url, {'key': browse_key})
library_root = ET.fromstring(library_response.text)
for library_item in library_root.findall('item'):
source = PlayerSource(
text=library_item.get('text', '').strip(),
image=library_item.get('image', ''),
browse_key=library_item.get('browseKey'),
play_url=library_item.get('playURL'),
input_type=library_item.get('inputType'),
type=library_item.get('type', ''),
search_key=library_root.get('searchKey')
)
sources.append(source)
else:
source = PlayerSource(
text=text,
image=item.get('image', ''),
browse_key=browse_key,
play_url=item.get('playURL'),
input_type=item.get('inputType'),
type=item.get('type', ''),
search_key=root.get('searchKey')
)
sources.append(source)
logger.info(f"Found {len(sources)} results for search '{search_string}' on {self.name}")
return sources
except requests.RequestException as e:
logger.error(f"Error searching on {self.name}: {str(e)}")
return []
class MyListener(ServiceListener):
def __init__(self):
self.players = []
def add_service(self, zeroconf: Zeroconf, type, name):
info = zeroconf.get_service_info(type, name)
ipv4 = [addr for addr in info.parsed_addresses() if addr.count('.') == 3][0]
player = BlusoundPlayer(host_name=ipv4, name=info.server)
self.players.append(player)
logger.info(f"Discovered new player: {player.name} at {player.host_name}")
def remove_service(self, zeroconf, type, name):
self.players = [p for p in self.players if p.name != name]
logger.info(f"Removed player: {name}")
def update_service(self, zeroconf, type, name):
logger.info(f"Updated service: {name}")
def discover(players):
logger.info("Starting discovery process")
zeroconf = Zeroconf()
listener = MyListener()
ServiceBrowser(zeroconf, "_musc._tcp.local.", listener)
try:
while True:
time.sleep(1)
players[:] = listener.players
finally:
zeroconf.close()
logger.info("Discovery process ended")
def threaded_discover():
logger.info("Starting threaded discovery")
players = []
discovery_thread = threading.Thread(target=discover, args=(players,), daemon=True)
discovery_thread.start()
return players