Skip to content

Commit 08ddb7e

Browse files
committed
refactor: replace FuturesUnordered
1 parent 5c648a2 commit 08ddb7e

1 file changed

Lines changed: 18 additions & 41 deletions

File tree

src/context/mod.rs

Lines changed: 18 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{
44
};
55

66
use eyre::{Context as _, Report, Result};
7-
use futures_util::{stream::FuturesUnordered, StreamExt as _};
7+
use futures_util::StreamExt as _;
88
use rosu_v2::Osu;
99
use tokio::time::{interval, sleep};
1010

@@ -239,57 +239,34 @@ impl Context {
239239
}
240240
}
241241

242-
let mut handle_user_result = |user_id, res| {
243-
let mut user = match res {
244-
Ok(user) => user,
245-
Err(err) => {
246-
error!(err = ?Report::new(err), "Failed to request user {user_id} from osu!api");
247-
248-
return;
249-
}
250-
};
251-
252-
// Process badges if required
253-
if check_badges {
254-
if let OsuUser::Available(ref mut user) = user {
255-
for badge in user.badges.iter_mut() {
256-
badges_incoming.push(user.user_id, badge, &mut badge_name_buf);
257-
}
258-
}
259-
}
260-
261-
users.push(user);
262-
};
263-
264242
let jobs = user_ids
265243
.into_iter()
266244
.zip(1..)
267245
.map(async |(user_id, i)| (i, user_id, self.request_osu_user(user_id).await));
268246

269-
let mut futures = FuturesUnordered::new();
247+
const CONCURRENT_USERS: usize = 4;
248+
249+
let mut stream = futures_util::stream::iter(jobs).buffer_unordered(CONCURRENT_USERS);
270250

271251
// Request osu! user data for all users for all modes.
272252
// The core loop and very expensive.
273-
for job in jobs {
274-
const CONCURRENT_USERS: usize = 4;
275-
276-
while futures.len() >= CONCURRENT_USERS {
277-
let Some((i, user_id, res)) = futures.next().await else {
278-
continue;
279-
};
280-
281-
handle_user_result(user_id, res);
253+
while let Some((i, user_id, res)) = stream.next().await {
254+
match res.map_err(Report::new) {
255+
Ok(mut user) => {
256+
// Process badges if required
257+
if check_badges {
258+
if let OsuUser::Available(ref mut user) = user {
259+
for badge in user.badges.iter_mut() {
260+
badges_incoming.push(user.user_id, badge, &mut badge_name_buf);
261+
}
262+
}
263+
}
282264

283-
self.update_progress(i, len, args, &mut eta, &mut progress)
284-
.await;
265+
users.push(user);
266+
}
267+
Err(err) => error!(?err, "Failed to request user {user_id} from osu!api"),
285268
}
286269

287-
futures.push(job);
288-
}
289-
290-
while let Some((i, user_id, res)) = futures.next().await {
291-
handle_user_result(user_id, res);
292-
293270
self.update_progress(i, len, args, &mut eta, &mut progress)
294271
.await;
295272
}

0 commit comments

Comments
 (0)