|
1 | 1 | //! Manage PostgreSQL archives |
2 | 2 | #![allow(dead_code)] |
3 | 3 |
|
4 | | -use crate::error::Error::Unexpected; |
5 | 4 | use crate::error::Result; |
6 | | -use crate::repository; |
7 | | -use flate2::bufread::GzDecoder; |
8 | | -use human_bytes::human_bytes; |
9 | | -use num_format::{Locale, ToFormattedString}; |
| 5 | +use crate::{extractor, repository}; |
10 | 6 | use semver::{Version, VersionReq}; |
11 | | -use std::fs::{create_dir_all, remove_dir_all, remove_file, rename, File}; |
12 | | -use std::io::{copy, BufReader, Cursor}; |
13 | | -use std::path::{Path, PathBuf}; |
14 | | -use std::thread::sleep; |
15 | | -use std::time::Duration; |
16 | | -use tar::Archive; |
17 | | -use tracing::{debug, instrument, warn}; |
| 7 | +use std::path::Path; |
| 8 | +use tracing::instrument; |
18 | 9 |
|
19 | 10 | pub const THESEUS_POSTGRESQL_BINARIES_URL: &str = |
20 | 11 | "https://github.com/theseus-rs/postgresql-binaries"; |
@@ -47,164 +38,15 @@ pub async fn get_archive(url: &str, version_req: &VersionReq) -> Result<(Version |
47 | 38 | Ok((version, bytes)) |
48 | 39 | } |
49 | 40 |
|
50 | | -/// Acquires a lock file in the [out_dir](Path) to prevent multiple processes from extracting the |
51 | | -/// archive at the same time. |
52 | | -/// |
53 | | -/// # Errors |
54 | | -/// * If the lock file cannot be acquired. |
55 | | -#[instrument(level = "debug")] |
56 | | -fn acquire_lock(out_dir: &Path) -> Result<PathBuf> { |
57 | | - let lock_file = out_dir.join("postgresql-archive.lock"); |
58 | | - |
59 | | - if lock_file.is_file() { |
60 | | - let metadata = lock_file.metadata()?; |
61 | | - let created = metadata.created()?; |
62 | | - |
63 | | - if created.elapsed()?.as_secs() > 300 { |
64 | | - warn!( |
65 | | - "Stale lock file detected; removing file to attempt process recovery: {}", |
66 | | - lock_file.to_string_lossy() |
67 | | - ); |
68 | | - remove_file(&lock_file)?; |
69 | | - } |
70 | | - } |
71 | | - |
72 | | - debug!( |
73 | | - "Attempting to acquire lock: {}", |
74 | | - lock_file.to_string_lossy() |
75 | | - ); |
76 | | - |
77 | | - for _ in 0..30 { |
78 | | - let lock = std::fs::OpenOptions::new() |
79 | | - .create(true) |
80 | | - .truncate(true) |
81 | | - .write(true) |
82 | | - .open(&lock_file); |
83 | | - |
84 | | - match lock { |
85 | | - Ok(_) => { |
86 | | - debug!("Lock acquired: {}", lock_file.to_string_lossy()); |
87 | | - return Ok(lock_file); |
88 | | - } |
89 | | - Err(error) => { |
90 | | - warn!("unable to acquire lock: {error}"); |
91 | | - sleep(Duration::from_secs(1)); |
92 | | - } |
93 | | - } |
94 | | - } |
95 | | - |
96 | | - Err(Unexpected("Failed to acquire lock".to_string())) |
97 | | -} |
98 | | - |
99 | 41 | /// Extracts the compressed tar `bytes` to the [out_dir](Path). |
100 | 42 | /// |
101 | 43 | /// # Errors |
102 | 44 | /// Returns an error if the extraction fails. |
103 | 45 | #[allow(clippy::cast_precision_loss)] |
104 | 46 | #[instrument(skip(bytes))] |
105 | | -pub async fn extract(bytes: &Vec<u8>, out_dir: &Path) -> Result<()> { |
106 | | - let input = BufReader::new(Cursor::new(bytes)); |
107 | | - let decoder = GzDecoder::new(input); |
108 | | - let mut archive = Archive::new(decoder); |
109 | | - let mut files = 0; |
110 | | - let mut extracted_bytes = 0; |
111 | | - |
112 | | - let parent_dir = if let Some(parent) = out_dir.parent() { |
113 | | - parent |
114 | | - } else { |
115 | | - debug!("No parent directory for {}", out_dir.to_string_lossy()); |
116 | | - out_dir |
117 | | - }; |
118 | | - |
119 | | - create_dir_all(parent_dir)?; |
120 | | - |
121 | | - let lock_file = acquire_lock(parent_dir)?; |
122 | | - // If the directory already exists, then the archive has already been |
123 | | - // extracted by another process. |
124 | | - if out_dir.exists() { |
125 | | - debug!( |
126 | | - "Directory already exists {}; skipping extraction: ", |
127 | | - out_dir.to_string_lossy() |
128 | | - ); |
129 | | - remove_file(&lock_file)?; |
130 | | - return Ok(()); |
131 | | - } |
132 | | - |
133 | | - let extract_dir = tempfile::tempdir_in(parent_dir)?.into_path(); |
134 | | - debug!("Extracting archive to {}", extract_dir.to_string_lossy()); |
135 | | - |
136 | | - for archive_entry in archive.entries()? { |
137 | | - let mut entry = archive_entry?; |
138 | | - let entry_header = entry.header(); |
139 | | - let entry_type = entry_header.entry_type(); |
140 | | - let entry_size = entry_header.size()?; |
141 | | - #[cfg(unix)] |
142 | | - let file_mode = entry_header.mode()?; |
143 | | - |
144 | | - let entry_header_path = entry_header.path()?.to_path_buf(); |
145 | | - let prefix = match entry_header_path.components().next() { |
146 | | - Some(component) => component.as_os_str().to_str().unwrap_or_default(), |
147 | | - None => { |
148 | | - return Err(Unexpected( |
149 | | - "Failed to get file header path prefix".to_string(), |
150 | | - )); |
151 | | - } |
152 | | - }; |
153 | | - let stripped_entry_header_path = entry_header_path.strip_prefix(prefix)?.to_path_buf(); |
154 | | - let mut entry_name = extract_dir.clone(); |
155 | | - entry_name.push(stripped_entry_header_path); |
156 | | - |
157 | | - if entry_type.is_dir() || entry_name.is_dir() { |
158 | | - create_dir_all(&entry_name)?; |
159 | | - } else if entry_type.is_file() { |
160 | | - let mut output_file = File::create(&entry_name)?; |
161 | | - copy(&mut entry, &mut output_file)?; |
162 | | - |
163 | | - files += 1; |
164 | | - extracted_bytes += entry_size; |
165 | | - |
166 | | - #[cfg(unix)] |
167 | | - { |
168 | | - use std::os::unix::fs::PermissionsExt; |
169 | | - output_file.set_permissions(std::fs::Permissions::from_mode(file_mode))?; |
170 | | - } |
171 | | - } else if entry_type.is_symlink() { |
172 | | - #[cfg(unix)] |
173 | | - if let Some(symlink_target) = entry.link_name()? { |
174 | | - let symlink_path = entry_name; |
175 | | - std::os::unix::fs::symlink(symlink_target.as_ref(), symlink_path)?; |
176 | | - } |
177 | | - } |
178 | | - } |
179 | | - |
180 | | - if out_dir.exists() { |
181 | | - debug!( |
182 | | - "Directory already exists {}; skipping rename and removing extraction directory: {}", |
183 | | - out_dir.to_string_lossy(), |
184 | | - extract_dir.to_string_lossy() |
185 | | - ); |
186 | | - remove_dir_all(&extract_dir)?; |
187 | | - } else { |
188 | | - debug!( |
189 | | - "Renaming {} to {}", |
190 | | - extract_dir.to_string_lossy(), |
191 | | - out_dir.to_string_lossy() |
192 | | - ); |
193 | | - rename(extract_dir, out_dir)?; |
194 | | - } |
195 | | - |
196 | | - if lock_file.is_file() { |
197 | | - debug!("Removing lock file: {}", lock_file.to_string_lossy()); |
198 | | - remove_file(lock_file)?; |
199 | | - } |
200 | | - |
201 | | - debug!( |
202 | | - "Extracting {} files totalling {}", |
203 | | - files.to_formatted_string(&Locale::en), |
204 | | - human_bytes(extracted_bytes as f64) |
205 | | - ); |
206 | | - |
207 | | - Ok(()) |
| 47 | +pub async fn extract(url: &str, bytes: &Vec<u8>, out_dir: &Path) -> Result<()> { |
| 48 | + let extractor_fn = extractor::registry::get(url)?; |
| 49 | + extractor_fn(bytes, out_dir) |
208 | 50 | } |
209 | 51 |
|
210 | 52 | #[cfg(test)] |
|
0 commit comments