Skip to content

Commit 0cbe5ce

Browse files
committed
pipeline cluster playlist create/add is now async per cluster job
1 parent c4dd022 commit 0cbe5ce

1 file changed

Lines changed: 72 additions & 39 deletions

File tree

Backend/playlist_processing.py

Lines changed: 72 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,17 @@
4444

4545
def _env_positive_int(name: str, default_value: int) -> int:
4646
"""Parse positive integer env values with safe fallback."""
47-
raw_value = os.getenv(name, str(default_value))
47+
raw_value = os.getenv(name)
48+
if raw_value is None:
49+
return default_value
50+
candidate = raw_value.strip()
51+
if not candidate:
52+
return default_value
4853
try:
49-
parsed = int(raw_value)
54+
parsed = int(candidate)
5055
except ValueError:
5156
return default_value
52-
return max(1, parsed)
57+
return parsed if parsed > 0 else 1
5358

5459

5560
FETCH_PAGE_CONCURRENCY = _env_positive_int("PLAYLIST_FETCH_PAGE_CONCURRENCY", 8)
@@ -180,64 +185,89 @@ async def create_and_populate_cluster_playlists(
180185
print("No eligible clusters to create playlists for.")
181186
return
182187

183-
create_start = time.time()
188+
pipeline_start = time.time()
184189
create_semaphore = asyncio.Semaphore(CREATE_PLAYLIST_CONCURRENCY)
190+
add_semaphore = asyncio.Semaphore(ADD_SONGS_CONCURRENCY)
191+
192+
async def process_cluster(candidate):
193+
create_duration = 0.0
194+
add_duration = 0.0
195+
add_calls = 0
196+
add_calls_successful = 0
197+
tracks_added = 0
185198

186-
async def create_cluster_playlist(candidate):
187199
async with create_semaphore:
200+
create_call_start = time.time()
188201
created_playlist_id = await create_playlist(
189202
user_id,
190203
auth_token,
191204
candidate["playlist_title"],
192205
candidate["playlist_description"],
193206
)
194-
if not created_playlist_id:
195-
return None
196-
return (candidate, created_playlist_id)
207+
create_duration += time.time() - create_call_start
197208

198-
created_entries = await asyncio.gather(
199-
*(create_cluster_playlist(candidate) for candidate in cluster_candidates)
200-
)
201-
created_entries = [entry for entry in created_entries if entry is not None]
202-
create_duration = time.time() - create_start
209+
if not created_playlist_id:
210+
return {
211+
"cluster_created": 0,
212+
"create_calls": 1,
213+
"create_time": create_duration,
214+
"add_calls": add_calls,
215+
"add_calls_successful": add_calls_successful,
216+
"tracks_added": tracks_added,
217+
"add_time": add_duration,
218+
}
203219

204-
add_jobs = []
205-
total_tracks_to_add = 0
206-
for candidate, created_playlist_id in created_entries:
207220
track_ids = candidate["track_ids"]
208221
slices = calc_slices(len(track_ids))
209222
for index in range(0, slices * 100, 100):
210223
track_slice = track_ids[index : index + 100]
211224
track_uris = [f"spotify:track:{track_id}" for track_id in track_slice]
212-
total_tracks_to_add += len(track_uris)
213-
add_jobs.append((created_playlist_id, track_uris))
225+
tracks_added += len(track_uris)
226+
add_calls += 1
214227

215-
add_duration = 0.0
216-
successful_add_calls = 0
217-
if add_jobs:
218-
add_start = time.time()
219-
add_semaphore = asyncio.Semaphore(ADD_SONGS_CONCURRENCY)
220-
221-
async def add_song_batch(job):
222-
playlist_id, track_uris = job
223228
async with add_semaphore:
224229
# Avoid "Index out of bounds" races by appending without explicit position.
225-
return await add_songs(playlist_id, track_uris, auth_token)
226-
227-
add_results = await asyncio.gather(*(add_song_batch(job) for job in add_jobs))
228-
successful_add_calls = sum(1 for result in add_results if result)
229-
add_duration = time.time() - add_start
230+
add_call_start = time.time()
231+
add_result = await add_songs(created_playlist_id, track_uris, auth_token)
232+
add_duration += time.time() - add_call_start
233+
if add_result:
234+
add_calls_successful += 1
235+
236+
return {
237+
"cluster_created": 1,
238+
"create_calls": 1,
239+
"create_time": create_duration,
240+
"add_calls": add_calls,
241+
"add_calls_successful": add_calls_successful,
242+
"tracks_added": tracks_added,
243+
"add_time": add_duration,
244+
}
245+
246+
cluster_results = await asyncio.gather(
247+
*(process_cluster(candidate) for candidate in cluster_candidates)
248+
)
249+
clusters_created = sum(result["cluster_created"] for result in cluster_results)
250+
create_calls = sum(result["create_calls"] for result in cluster_results)
251+
create_duration = sum(result["create_time"] for result in cluster_results)
252+
add_calls = sum(result["add_calls"] for result in cluster_results)
253+
successful_add_calls = sum(
254+
result["add_calls_successful"] for result in cluster_results
255+
)
256+
total_tracks_to_add = sum(result["tracks_added"] for result in cluster_results)
257+
add_duration = sum(result["add_time"] for result in cluster_results)
258+
pipeline_duration = time.time() - pipeline_start
230259

231260
print(
232261
"Playlist write stats:",
233262
f"clusters_considered={len(sorted_clusters)},",
234-
f"clusters_created={len(created_entries)},",
235-
f"create_calls={len(cluster_candidates)},",
263+
f"clusters_created={clusters_created},",
264+
f"create_calls={create_calls},",
236265
f"create_time={create_duration:.2f}s,",
237-
f"add_calls={len(add_jobs)},",
266+
f"add_calls={add_calls},",
238267
f"add_calls_successful={successful_add_calls},",
239268
f"tracks_added={total_tracks_to_add},",
240-
f"add_time={add_duration:.2f}s",
269+
f"add_time={add_duration:.2f}s,",
270+
f"pipeline_time={pipeline_duration:.2f}s",
241271
)
242272

243273
log_step_time("Creating and populating cluster playlists", start_time)
@@ -248,15 +278,18 @@ async def process_single_playlist(auth_token, playlist_id, user_id):
248278
start_time = time.time()
249279
print(f"Processing {playlist_id}...")
250280
playlist_name_start = time.time()
251-
playlist_name = get_playlist_name(playlist_id, auth_token)
252-
log_step_time(
253-
f"Fetch source playlist name ({playlist_id})",
254-
playlist_name_start,
281+
playlist_name_task = asyncio.create_task(
282+
asyncio.to_thread(get_playlist_name, playlist_id, auth_token)
255283
)
256284

257285
track_fetch_start = time.time()
258286
track_ids = await get_playlist_track_ids(auth_token, playlist_id)
259287
log_step_time(f"Collect playlist tracks ({playlist_id})", track_fetch_start)
288+
playlist_name = await playlist_name_task
289+
log_step_time(
290+
f"Fetch source playlist name ({playlist_id})",
291+
playlist_name_start,
292+
)
260293
if not track_ids:
261294
print(f"No tracks found for playlist {playlist_id}")
262295
return

0 commit comments

Comments
 (0)