-
Notifications
You must be signed in to change notification settings - Fork 104
Expand file tree
/
Copy pathtranspose.rs
More file actions
333 lines (287 loc) · 13.2 KB
/
transpose.rs
File metadata and controls
333 lines (287 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
static USAGE: &str = r#"
Transpose the rows/columns of CSV data.
Usage:
qsv transpose [options] [<input>]
qsv transpose --help
Examples:
# Transpose data in-memory.
$ qsv transpose data.csv
# Transpose data using multiple passes. For large datasets.
$ qsv transpose data.csv --multipass
# Convert CSV to "long" format using the first column as the "field" identifier
$ qsv transpose data.csv --long 1
# use the columns "name" & "age" as the "field" identifier
$ qsv transpose --long "name,age" data.csv
# use the columns 1 & 3 as the "field" identifier
$ qsv transpose --long 1,3 data.csv
# use the columns 1 to 3 as the "field" identifier
$ qsv transpose --long 1-3 data.csv
# use all columns starting with "name" as the "field" identifier
$ qsv transpose --long /^name/ data.csv
See https://github.com/dathere/qsv/blob/master/tests/test_transpose.rs for more examples.
transpose options:
-m, --multipass Process the transpose by making multiple passes
over the dataset. Consumes memory relative to
the number of rows.
Note that in general it is faster to
process the transpose in memory.
Useful for really big datasets as the default
is to read the entire dataset into memory.
-s, --select <arg> Select a subset of columns to transpose.
When used with --long, this filters which columns
become attribute rows (the field columns are unaffected).
See 'qsv select --help' for the full selection syntax.
--long <selection> Convert wide-format CSV to "long" format.
Output format is three columns:
field, attribute, value. Empty values are skipped.
Mutually exclusive with --multipass.
The <selection> argument is REQUIRED when using --long,
it specifies which column(s) to use as the "field" identifier.
It uses the same selection syntax as 'qsv select':
* Column names: --long varname or --long "column name"
* Column indices (1-based): --long 5 or --long 2,3
* Ranges: --long 1-4 or --long 3-
* Regex patterns: --long /^prefix/
* Comma-separated: --long var1,var2 or --long 1,3,5
Multiple field columns are concatenated with | separator.
Common options:
-h, --help Display this message
-o, --output <file> Write output to <file> instead of stdout.
-d, --delimiter <arg> The field delimiter for reading CSV data.
Must be a single character. (default: ,)
--memcheck Check if there is enough memory to load the entire
CSV into memory using CONSERVATIVE heuristics.
Ignored when --multipass or --long option is enabled.
"#;
use std::{fs::File, str};
use csv::ByteRecord;
use foldhash::HashSet;
use memmap2::MmapOptions;
use serde::Deserialize;
use crate::{
CliError, CliResult,
config::{Config, DEFAULT_WTR_BUFFER_CAPACITY, Delimiter},
select::SelectColumns,
util,
};
#[allow(clippy::unsafe_derive_deserialize)]
#[derive(Deserialize)]
struct Args {
arg_input: Option<String>,
flag_output: Option<String>,
flag_delimiter: Option<Delimiter>,
flag_multipass: bool,
flag_select: Option<SelectColumns>,
flag_long: Option<String>,
flag_memcheck: bool,
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
// --long and --multipass are mutually exclusive
if args.flag_long.is_some() && args.flag_multipass {
return fail_incorrectusage_clierror!(
"The --long and --multipass options are mutually exclusive."
);
}
if args.flag_long.is_some() {
return args.wide_to_long();
}
let input_is_stdin = match args.arg_input {
Some(ref s) if s == "-" => true,
None => true,
_ => false,
};
if args.flag_multipass && !input_is_stdin {
args.multipass_transpose_streaming()
} else {
args.in_memory_transpose()
}
}
impl Args {
/// Resolve --select against the given header record.
/// Returns Ok(None) when --select was not specified.
fn parse_select(&self, headers: &ByteRecord) -> CliResult<Option<Vec<usize>>> {
let Some(ref sel) = self.flag_select else {
return Ok(None);
};
let selection = sel
.selection(headers, true)
.map_err(|e| CliError::Other(format!("--select error: {e}")))?;
if selection.is_empty() {
return fail_incorrectusage_clierror!("--select resulted in no columns to transpose.");
}
Ok(Some(selection.iter().copied().collect()))
}
fn wide_to_long(&self) -> CliResult<()> {
let mut rdr = Config::new(self.arg_input.as_ref())
.delimiter(self.flag_delimiter)
.no_headers(false)
.reader()?;
let mut wtr = self.wconfig().writer()?;
let headers = rdr.byte_headers()?.clone();
if headers.is_empty() {
return fail_incorrectusage_clierror!("CSV file must have at least one column.");
}
// --long is required by docopt; defensively report a usage error if absent.
let selection_str = match self.flag_long.as_deref() {
Some(s) => s,
None => {
return fail_incorrectusage_clierror!(
"--long requires a column selection argument."
);
},
};
let select_cols = SelectColumns::parse(selection_str)
.map_err(|e| CliError::Other(format!("--long parse error: {e}")))?;
let selection = select_cols
.selection(&headers, true)
.map_err(|e| CliError::Other(format!("--long selection error: {e}")))?;
if selection.is_empty() {
return fail_incorrectusage_clierror!(
"--long resulted in no columns. At least one field column is required."
);
}
let field_column_indices: Vec<usize> = selection.iter().copied().collect();
let field_column_set: HashSet<usize> = field_column_indices.iter().copied().collect();
// --select filters which columns become attribute rows (field columns are unaffected)
let selected_attribute_set: Option<HashSet<usize>> = self
.parse_select(&headers)?
.map(|v| v.into_iter().collect());
// Write output headers
let mut header_record = ByteRecord::with_capacity(64, 3);
header_record.push_field(b"field");
header_record.push_field(b"attribute");
header_record.push_field(b"value");
wtr.write_byte_record(&header_record)?;
// Reusable buffers (allocated once, reused per row).
let multi_field = field_column_indices.len() > 1;
let mut field_buf: Vec<u8> = Vec::with_capacity(256);
let mut output_record = ByteRecord::with_capacity(256, 3);
let mut data_record = ByteRecord::new();
while rdr.read_byte_record(&mut data_record)? {
// Build the field key — borrow the slice for the single-column case to
// avoid an allocation per row; concatenate into a reused buffer otherwise.
let field_slice: &[u8] = if multi_field {
field_buf.clear();
for (i, &idx) in field_column_indices.iter().enumerate() {
if i > 0 {
field_buf.push(b'|');
}
if let Some(v) = data_record.get(idx) {
field_buf.extend_from_slice(v);
}
}
&field_buf
} else {
data_record.get(field_column_indices[0]).unwrap_or(b"")
};
// Iterate through all columns, skipping field columns and non-selected columns
for (i, attribute_header) in headers.iter().enumerate() {
if field_column_set.contains(&i) {
continue;
}
if let Some(ref sel_set) = selected_attribute_set
&& !sel_set.contains(&i)
{
continue;
}
if let Some(value) = data_record.get(i)
&& !value.is_empty()
{
output_record.clear();
output_record.push_field(field_slice);
output_record.push_field(attribute_header);
output_record.push_field(value);
wtr.write_byte_record(&output_record)?;
}
}
}
Ok(wtr.flush()?)
}
fn in_memory_transpose(&self) -> CliResult<()> {
// we're loading the entire file into memory, we need to check avail mem
if let Some(path) = self.rconfig().path
&& let Err(e) = util::mem_file_check(&path, false, self.flag_memcheck)
{
eprintln!("File too large for in-memory transpose: {e}.\nDoing multipass transpose...");
return self.multipass_transpose_streaming();
}
let mut rdr = self.rconfig().reader()?;
let mut wtr = self.wconfig().writer()?;
// The reader is configured with no_headers(true), so the first record IS the
// input CSV's header row — we need it to participate in the transpose AND to
// resolve --select by column name. Collect everything once.
let all = rdr.byte_records().collect::<Result<Vec<_>, _>>()?;
let ncols = all.first().map_or(0, ByteRecord::len);
let empty_rec = ByteRecord::new();
let headers_for_select = all.first().unwrap_or(&empty_rec);
let indices: Vec<usize> = self
.parse_select(headers_for_select)?
.unwrap_or_else(|| (0..ncols).collect());
let mut record = ByteRecord::with_capacity(1024, all.len());
for i in indices {
record.clear();
for row in &all {
if i < row.len() {
record.push_field(&row[i]);
}
}
wtr.write_byte_record(&record)?;
}
Ok(wtr.flush()?)
}
fn multipass_transpose_streaming(&self) -> CliResult<()> {
// Memory map the file for efficient cross-pass access.
// No `.populate()` here on purpose — `--multipass` exists to avoid loading
// the whole dataset into memory, so we let the OS page in lazily.
let file = File::open(self.arg_input.as_ref().unwrap())?;
// safety: `run()` only routes here when `input_is_stdin == false`, so
// `arg_input` names an on-disk file that can be memory-mapped. The
// `file` binding stays in scope for the rest of this function and all
// uses of `mmap` are confined to the same scope, so the file handle
// outlives the mapping. We open the file read-only and only ever read
// from `&mmap[..]` to feed CSV parsers across passes — this command
// does not mutate or truncate the file. As with any file-backed mmap,
// soundness still relies on no other process concurrently truncating
// or otherwise mutating the file while the mapping is live.
let mmap = unsafe { MmapOptions::new().map(&file)? };
let rconfig = self.rconfig();
// Read the first record to determine column count & resolve --select. This
// also serves as the header row for name-based selection.
let mut header_rdr = rconfig.from_reader(&mmap[..]);
let mut headers = ByteRecord::new();
let _ = header_rdr.read_byte_record(&mut headers)?;
let ncols = headers.len();
drop(header_rdr);
let indices: Vec<usize> = self
.parse_select(&headers)?
.unwrap_or_else(|| (0..ncols).collect());
let mut wtr = self.wconfig().writer()?;
let mut record = ByteRecord::with_capacity(1024, ncols);
for i in indices {
record.clear();
// Restart parsing of the mmap'd CSV for this output column.
// The mmap stays mapped across passes, so we get page-cache locality
// rather than re-reading bytes from disk.
let mut rdr = rconfig.from_reader(&mmap[..]);
for row in rdr.byte_records() {
let row = row?;
if i < row.len() {
record.push_field(&row[i]);
}
}
wtr.write_byte_record(&record)?;
}
Ok(wtr.flush()?)
}
fn wconfig(&self) -> Config {
// Wide rows after transpose can be very large; bump the write buffer
// to amortize syscalls.
Config::new(self.flag_output.as_ref()).set_write_buffer(DEFAULT_WTR_BUFFER_CAPACITY * 20)
}
fn rconfig(&self) -> Config {
Config::new(self.arg_input.as_ref())
.delimiter(self.flag_delimiter)
.no_headers(true)
}
}