Skip to content
This repository was archived by the owner on Feb 7, 2026. It is now read-only.

Commit e236f30

Browse files
author
Till Wegmueller
committed
Convert trait methods to use &self instead of &mut self, introduce Mutex for interior mutability, optimize HTTP client creation, and implement parallel payload processing using Rayon.
1 parent 0de84b8 commit e236f30

8 files changed

Lines changed: 143 additions & 55 deletions

File tree

Cargo.lock

Lines changed: 40 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libips/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ serde = { version = "1.0.207", features = ["derive"] }
3434
serde_json = "1.0.124"
3535
flate2 = "1.0.28"
3636
lz4 = "1.24.0"
37-
base64 = "0.22"
3837
semver = { version = "1.0.20", features = ["serde"] }
3938
diff-struct = "0.5.3"
4039
chrono = "0.4.41"
@@ -44,6 +43,7 @@ rusqlite = { version = "0.31", default-features = false }
4443
rust-ini = "0.21"
4544
reqwest = { version = "0.12", features = ["blocking", "json", "gzip", "deflate"] }
4645
resolvo = "0.10"
46+
rayon = "1.11"
4747

4848
[features]
4949
default = ["bundled-sqlite"]

libips/src/recv.rs

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,23 @@ use crate::repository::{
99
FileBackend, NoopProgressReporter, ProgressInfo, ProgressReporter, ReadableRepository,
1010
RepositoryError, Result, WritableRepository,
1111
};
12+
use rayon::prelude::*;
1213
use std::collections::HashSet;
14+
use std::sync::{Arc, Mutex};
1315
use tempfile::tempdir;
1416
use tracing::{debug, info};
1517

1618
/// PackageReceiver handles downloading packages from a source repository
1719
/// and storing them in a destination repository.
1820
pub struct PackageReceiver<'a, S: ReadableRepository> {
19-
source: &'a mut S,
21+
source: &'a S,
2022
dest: FileBackend,
2123
progress: Option<&'a dyn ProgressReporter>,
2224
}
2325

24-
impl<'a, S: ReadableRepository> PackageReceiver<'a, S> {
26+
impl<'a, S: ReadableRepository + Sync> PackageReceiver<'a, S> {
2527
/// Create a new PackageReceiver
26-
pub fn new(source: &'a mut S, dest: FileBackend) -> Self {
28+
pub fn new(source: &'a S, dest: FileBackend) -> Self {
2729
Self {
2830
source,
2931
dest,
@@ -215,28 +217,53 @@ impl<'a, S: ReadableRepository> PackageReceiver<'a, S> {
215217
.collect();
216218
let total_files = payload_files.len() as u64;
217219

218-
for (i, file) in payload_files.into_iter().enumerate() {
219-
if let Some(payload) = &file.payload {
220-
let files_done = (i + 1) as u64;
221-
let digest = &payload.primary_identifier.hash;
220+
// Download all payloads in parallel
221+
let files_done = Arc::new(Mutex::new(0u64));
222+
let publisher_str = publisher.to_string();
223+
let fmri_name = fmri.name.clone();
224+
let temp_dir_path = temp_dir.path().to_path_buf();
222225

223-
progress.update(
224-
&ProgressInfo::new(format!("Receiving payloads for {}", fmri.name))
225-
.with_total(total_files)
226-
.with_current(files_done)
227-
.with_context(format!("Payload: {}", digest)),
228-
);
226+
let download_results: std::result::Result<Vec<_>, RepositoryError> = payload_files
227+
.par_iter()
228+
.map(|file| {
229+
let payload = file.payload.as_ref().unwrap();
230+
let digest = &payload.primary_identifier.hash;
231+
let temp_file_path = temp_dir_path.join(digest);
229232

230-
let temp_file_path = temp_dir.path().join(digest);
231233
debug!(
232234
"Fetching payload {} to {}",
233235
digest,
234236
temp_file_path.display()
235237
);
238+
239+
// Download the payload (now works with &self)
236240
self.source
237-
.fetch_payload(publisher, digest, &temp_file_path)?;
238-
txn.add_file(file.clone(), &temp_file_path)?;
239-
}
241+
.fetch_payload(&publisher_str, digest, &temp_file_path)?;
242+
243+
// Update progress atomically
244+
let current_count = {
245+
let mut count = files_done.lock()
246+
.map_err(|e| RepositoryError::Other(format!("Failed to lock progress counter: {}", e)))?;
247+
*count += 1;
248+
*count
249+
};
250+
251+
progress.update(
252+
&ProgressInfo::new(format!("Receiving payloads for {}", fmri_name))
253+
.with_total(total_files)
254+
.with_current(current_count)
255+
.with_context(format!("Payload: {}", digest)),
256+
);
257+
258+
Ok((file, temp_file_path))
259+
})
260+
.collect();
261+
262+
let download_info = download_results?;
263+
264+
// Add all files to the transaction
265+
for (file, temp_file_path) in download_info {
266+
txn.add_file((*file).clone(), &temp_file_path)?;
240267
}
241268

242269
txn.update_manifest(manifest.clone());
@@ -279,7 +306,7 @@ mod tests {
279306
// Create dest repo
280307
let dest_repo = FileBackend::create(dest_dir.path(), RepositoryVersion::V4)?;
281308

282-
let mut receiver = PackageReceiver::new(&mut source_repo, dest_repo);
309+
let mut receiver = PackageReceiver::new(&source_repo, dest_repo);
283310
receiver.receive(Some("test"), &[Fmri::new("pkgA")], false)?;
284311

285312
// Verify dest repo has the package
@@ -318,7 +345,7 @@ mod tests {
318345
// Create dest repo
319346
let dest_repo = FileBackend::create(dest_dir.path(), RepositoryVersion::V4)?;
320347

321-
let mut receiver = PackageReceiver::new(&mut source_repo, dest_repo);
348+
let mut receiver = PackageReceiver::new(&source_repo, dest_repo);
322349
receiver.receive(Some("test"), &[Fmri::new("pkgA")], false)?;
323350

324351
// Verify dest repo has the package and the manifest is in IPS format

libips/src/repository/file_backend.rs

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::fs::File;
1616
use std::io::{Read, Write};
1717
use std::path::{Path, PathBuf};
1818
use std::str::FromStr;
19+
use std::sync::Mutex;
1920
use std::time::{SystemTime, UNIX_EPOCH};
2021
use tracing::{debug, error, info};
2122
use walkdir::WalkDir;
@@ -358,11 +359,11 @@ pub struct FileBackend {
358359
pub path: PathBuf,
359360
pub config: RepositoryConfig,
360361
/// Catalog manager for handling catalog operations
361-
/// Uses RefCell for interior mutability to allow mutation through immutable references
362-
catalog_manager: Option<std::cell::RefCell<crate::repository::catalog::CatalogManager>>,
362+
/// Uses Mutex for interior mutability to allow mutation through immutable references (thread-safe)
363+
catalog_manager: Option<Mutex<crate::repository::catalog::CatalogManager>>,
363364
/// Manager for obsoleted packages
364365
obsoleted_manager:
365-
Option<std::cell::RefCell<crate::repository::obsoleted::ObsoletedPackageManager>>,
366+
Option<Mutex<crate::repository::obsoleted::ObsoletedPackageManager>>,
366367
}
367368

368369
/// Transaction for publishing packages
@@ -1467,7 +1468,7 @@ impl ReadableRepository for FileBackend {
14671468
Ok(package_contents)
14681469
}
14691470

1470-
fn fetch_payload(&mut self, publisher: &str, digest: &str, dest: &Path) -> Result<()> {
1471+
fn fetch_payload(&self, publisher: &str, digest: &str, dest: &Path) -> Result<()> {
14711472
// Parse digest; supports both raw hash and source:algorithm:hash
14721473
let parsed = match Digest::from_str(digest) {
14731474
Ok(d) => d,
@@ -1549,7 +1550,7 @@ impl ReadableRepository for FileBackend {
15491550
}
15501551

15511552
fn fetch_manifest(
1552-
&mut self,
1553+
&self,
15531554
publisher: &str,
15541555
fmri: &crate::fmri::Fmri,
15551556
) -> Result<crate::actions::Manifest> {
@@ -1596,7 +1597,7 @@ impl ReadableRepository for FileBackend {
15961597
)))
15971598
}
15981599

1599-
fn fetch_manifest_text(&mut self, publisher: &str, fmri: &Fmri) -> Result<String> {
1600+
fn fetch_manifest_text(&self, publisher: &str, fmri: &Fmri) -> Result<String> {
16001601
// Require a concrete version
16011602
let version = fmri.version();
16021603
if version.is_empty() {
@@ -2387,8 +2388,9 @@ impl FileBackend {
23872388
&self.obsoleted_manager
23882389
{
23892390
obsoleted_manager
2390-
.borrow()
2391-
.is_obsoleted(publisher, &final_fmri)
2391+
.lock()
2392+
.map(|mgr| mgr.is_obsoleted(publisher, &final_fmri))
2393+
.unwrap_or(false)
23922394
} else {
23932395
false
23942396
};
@@ -2853,40 +2855,46 @@ impl FileBackend {
28532855
/// Get or initialize the catalog manager
28542856
///
28552857
/// This method returns a mutable reference to the catalog manager.
2856-
/// It uses interior mutability with RefCell to allow mutation through an immutable reference.
2858+
/// It uses interior mutability with Mutex to allow mutation through an immutable reference.
28572859
///
28582860
/// The catalog manager is specific to the given publisher.
28592861
pub fn get_catalog_manager(
28602862
&mut self,
28612863
publisher: &str,
2862-
) -> Result<std::cell::RefMut<'_, crate::repository::catalog::CatalogManager>> {
2864+
) -> Result<std::sync::MutexGuard<'_, crate::repository::catalog::CatalogManager>> {
28632865
if self.catalog_manager.is_none() {
28642866
let publisher_dir = self.path.join("publisher");
28652867
let manager =
28662868
crate::repository::catalog::CatalogManager::new(&publisher_dir, publisher)?;
2867-
let refcell = std::cell::RefCell::new(manager);
2868-
self.catalog_manager = Some(refcell);
2869+
let mutex = Mutex::new(manager);
2870+
self.catalog_manager = Some(mutex);
28692871
}
28702872

2871-
// This is safe because we just checked that catalog_manager is Some
2872-
Ok(self.catalog_manager.as_ref().unwrap().borrow_mut())
2873+
self.catalog_manager
2874+
.as_ref()
2875+
.ok_or_else(|| RepositoryError::Other("Catalog manager not initialized".to_string()))?
2876+
.lock()
2877+
.map_err(|e| RepositoryError::Other(format!("Failed to lock catalog manager: {}", e)))
28732878
}
28742879

28752880
/// Get or initialize the obsoleted package manager
28762881
///
28772882
/// This method returns a mutable reference to the obsoleted package manager.
2878-
/// It uses interior mutability with RefCell to allow mutation through an immutable reference.
2883+
/// It uses interior mutability with Mutex to allow mutation through an immutable reference.
28792884
pub fn get_obsoleted_manager(
28802885
&mut self,
2881-
) -> Result<std::cell::RefMut<'_, crate::repository::obsoleted::ObsoletedPackageManager>> {
2886+
) -> Result<std::sync::MutexGuard<'_, crate::repository::obsoleted::ObsoletedPackageManager>> {
28822887
if self.obsoleted_manager.is_none() {
28832888
let manager = crate::repository::obsoleted::ObsoletedPackageManager::new(&self.path);
2884-
let refcell = std::cell::RefCell::new(manager);
2885-
self.obsoleted_manager = Some(refcell);
2889+
let mutex = Mutex::new(manager);
2890+
self.obsoleted_manager = Some(mutex);
28862891
}
28872892

2888-
// This is safe because we just checked that obsoleted_manager is Some
2889-
Ok(self.obsoleted_manager.as_ref().unwrap().borrow_mut())
2893+
self.obsoleted_manager
2894+
.as_ref()
2895+
.ok_or_else(|| RepositoryError::Other("Obsoleted manager not initialized".to_string()))?
2896+
.lock()
2897+
.map_err(|e| RepositoryError::Other(format!("Failed to lock obsoleted manager: {}", e)))
28902898
}
28912899

28922900
/// URL encode a string for use in a filename

libips/src/repository/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -334,19 +334,19 @@ pub trait ReadableRepository {
334334
/// Fetch a content payload identified by digest into the destination path.
335335
/// Implementations should download/copy the payload to a temporary path,
336336
/// verify integrity, and atomically move into `dest`.
337-
fn fetch_payload(&mut self, publisher: &str, digest: &str, dest: &Path) -> Result<()>;
337+
fn fetch_payload(&self, publisher: &str, digest: &str, dest: &Path) -> Result<()>;
338338

339339
/// Fetch a package manifest by FMRI from the repository.
340340
/// Implementations should retrieve and parse the manifest for the given
341341
/// publisher and fully-qualified FMRI (name@version).
342342
fn fetch_manifest(
343-
&mut self,
343+
&self,
344344
publisher: &str,
345345
fmri: &crate::fmri::Fmri,
346346
) -> Result<crate::actions::Manifest>;
347347

348348
/// Fetch a package manifest as raw text by FMRI from the repository.
349-
fn fetch_manifest_text(&mut self, publisher: &str, fmri: &crate::fmri::Fmri) -> Result<String>;
349+
fn fetch_manifest_text(&self, publisher: &str, fmri: &crate::fmri::Fmri) -> Result<String>;
350350

351351
/// Search for packages in the repository
352352
///

libips/src/repository/progress.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use std::fmt;
3535
/// }
3636
/// }
3737
/// ```
38-
pub trait ProgressReporter {
38+
pub trait ProgressReporter: Send + Sync {
3939
/// Called when an operation starts.
4040
///
4141
/// # Arguments

0 commit comments

Comments
 (0)