Skip to content

Commit a624770

Browse files
committed
cargo fmt
1 parent 58ccc1e commit a624770

6 files changed

Lines changed: 223 additions & 132 deletions

File tree

src/bin/main.rs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
use std::env;
1+
use datafusion::prelude::*;
2+
use dfkit::commands::{cat, convert, count, describe, dfsplit, query, reverse, schema, sort, view};
23
use dfkit::utils::{DfKitError, parse_file_list};
3-
use structopt::StructOpt;
4+
use std::env;
45
use std::path::PathBuf;
5-
use datafusion::prelude::*;
6-
use dfkit::commands::{view, query, convert, describe, schema, count, sort, reverse, dfsplit, cat};
6+
use structopt::StructOpt;
77

88
#[derive(StructOpt, Debug)]
9-
#[structopt(name = "dfkit", about = "A fast SQL-based CLI tool for working with CSV, Parquet, and JSON data files.")]
9+
#[structopt(
10+
name = "dfkit",
11+
about = "A fast SQL-based CLI tool for working with CSV, Parquet, and JSON data files."
12+
)]
1013
pub struct Cli {
1114
#[structopt(subcommand)]
1215
pub command: Commands,
@@ -38,7 +41,7 @@ pub enum Commands {
3841
#[structopt(parse(from_os_str))]
3942
filename: PathBuf,
4043
#[structopt(parse(from_os_str))]
41-
output_filename: PathBuf,
44+
output: PathBuf,
4245
},
4346

4447
#[structopt(about = "Show summary statistics for a file")]
@@ -65,7 +68,7 @@ pub enum Commands {
6568
filename: PathBuf,
6669
#[structopt(short, long, use_delimiter = true)]
6770
columns: Vec<String>,
68-
#[structopt(short,long)]
71+
#[structopt(short, long)]
6972
descending: bool,
7073
#[structopt(short = "o", long = "output", parse(from_os_str))]
7174
output: Option<PathBuf>,
@@ -83,9 +86,9 @@ pub enum Commands {
8386
Split {
8487
#[structopt(parse(from_os_str))]
8588
filename: PathBuf,
86-
#[structopt(short,long)]
89+
#[structopt(short, long)]
8790
chunks: usize,
88-
#[structopt(parse(from_os_str))]
91+
#[structopt(short, long)]
8992
output: Option<PathBuf>,
9093
},
9194

@@ -95,9 +98,9 @@ pub enum Commands {
9598
files: Option<String>,
9699
#[structopt(long, required_unless = "files")]
97100
dir: Option<PathBuf>,
98-
#[structopt(short, long)]
101+
#[structopt(short, long, parse(from_os_str))]
99102
output: PathBuf,
100-
}
103+
},
101104
}
102105

103106
#[tokio::main]
@@ -110,11 +113,18 @@ async fn main() -> Result<(), DfKitError> {
110113
Commands::View { filename, limit } => {
111114
view(&ctx, &filename, limit).await?;
112115
}
113-
Commands::Query { filename, sql , output} => {
116+
Commands::Query {
117+
filename,
118+
sql,
119+
output,
120+
} => {
114121
query(&ctx, &filename, sql, output).await?;
115122
}
116-
Commands::Convert { filename, output_filename } => {
117-
convert(&ctx, &filename, &output_filename).await?;
123+
Commands::Convert {
124+
filename,
125+
output,
126+
} => {
127+
convert(&ctx, &filename, &output).await?;
118128
}
119129
Commands::Describe { filename } => {
120130
describe(&ctx, &filename).await?;
@@ -125,13 +135,22 @@ async fn main() -> Result<(), DfKitError> {
125135
Commands::Count { filename } => {
126136
count(&ctx, &filename).await?;
127137
}
128-
Commands::Sort { filename, columns, descending, output } => {
138+
Commands::Sort {
139+
filename,
140+
columns,
141+
descending,
142+
output,
143+
} => {
129144
sort(&ctx, &filename, &columns, descending, output).await?;
130145
}
131146
Commands::Reverse { filename, output } => {
132147
reverse(&ctx, &filename, output).await?;
133148
}
134-
Commands::Split { filename, chunks, output} => {
149+
Commands::Split {
150+
filename,
151+
chunks,
152+
output,
153+
} => {
135154
let out_dir = output.unwrap_or_else(|| env::current_dir().unwrap());
136155
dfsplit(&ctx, &filename, chunks, &out_dir).await?;
137156
}

src/commands.rs

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1-
use std::fs;
2-
use std::path::{Path, PathBuf};
3-
use std::sync::Arc;
1+
use crate::utils::{DfKitError, file_type, register_table, write_output};
42
use datafusion::arrow::compute::concat_batches;
53
use datafusion::datasource::MemTable;
64
use datafusion::logical_expr::col;
75
use datafusion::prelude::SessionContext;
8-
use crate::utils::{file_type, register_table, write_output, DfKitError};
6+
use std::fs;
7+
use std::path::{Path, PathBuf};
8+
use std::sync::Arc;
99

10-
pub async fn view(ctx: &SessionContext, filename: &Path, limit: Option<usize>) -> Result<(), DfKitError> {
10+
pub async fn view(
11+
ctx: &SessionContext,
12+
filename: &Path,
13+
limit: Option<usize>,
14+
) -> Result<(), DfKitError> {
1115
let df = register_table(&ctx, "t", &filename).await?;
1216
let limit = limit.unwrap_or(10);
1317

@@ -20,7 +24,12 @@ pub async fn view(ctx: &SessionContext, filename: &Path, limit: Option<usize>) -
2024
Ok(())
2125
}
2226

23-
pub async fn query(ctx: &SessionContext, filename: &Path, sql: Option<String>, output: Option<PathBuf>) -> Result<(), DfKitError> {
27+
pub async fn query(
28+
ctx: &SessionContext,
29+
filename: &Path,
30+
sql: Option<String>,
31+
output: Option<PathBuf>,
32+
) -> Result<(), DfKitError> {
2433
let file_type = file_type(&filename)?;
2534
let _ = register_table(&ctx, "t", &filename).await?;
2635
let df_sql = ctx.sql(&*sql.unwrap()).await?;
@@ -35,7 +44,11 @@ pub async fn query(ctx: &SessionContext, filename: &Path, sql: Option<String>, o
3544
Ok(())
3645
}
3746

38-
pub async fn convert(ctx: &SessionContext, filename: &Path, output_filename: &Path) -> Result<(), DfKitError> {
47+
pub async fn convert(
48+
ctx: &SessionContext,
49+
filename: &Path,
50+
output_filename: &Path,
51+
) -> Result<(), DfKitError> {
3952
let df = register_table(ctx, "t", &filename).await?;
4053
let output_file_type = file_type(&output_filename)?;
4154

@@ -130,9 +143,16 @@ pub async fn reverse(
130143
Ok(())
131144
}
132145

133-
pub async fn dfsplit(ctx: &SessionContext, filename: &Path, chunks: usize, output_dir: &Path) -> Result<(), DfKitError> {
146+
pub async fn dfsplit(
147+
ctx: &SessionContext,
148+
filename: &Path,
149+
chunks: usize,
150+
output_dir: &Path,
151+
) -> Result<(), DfKitError> {
134152
if chunks == 0 {
135-
return Err(DfKitError::CustomError("Chunks must be greater than 0".into()));
153+
return Err(DfKitError::CustomError(
154+
"Chunks must be greater than 0".into(),
155+
));
136156
}
137157
let df = register_table(ctx, "t", filename).await?;
138158
let total_rows = df.clone().count().await?;
@@ -144,7 +164,9 @@ pub async fn dfsplit(ctx: &SessionContext, filename: &Path, chunks: usize, outpu
144164
}
145165

146166
if chunks > total_rows {
147-
return Err(DfKitError::CustomError("Chunks must be smaller than total rows".into()));
167+
return Err(DfKitError::CustomError(
168+
"Chunks must be smaller than total rows".into(),
169+
));
148170
}
149171

150172
fs::create_dir_all(output_dir)?;
@@ -172,7 +194,11 @@ pub async fn dfsplit(ctx: &SessionContext, filename: &Path, chunks: usize, outpu
172194
Ok(())
173195
}
174196

175-
pub async fn cat(ctx: &SessionContext, files: Vec<PathBuf>, out_path: &Path) -> Result<(), DfKitError> {
197+
pub async fn cat(
198+
ctx: &SessionContext,
199+
files: Vec<PathBuf>,
200+
out_path: &Path,
201+
) -> Result<(), DfKitError> {
176202
let mut dfs = vec![];
177203

178204
for (i, file) in files.iter().enumerate() {

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
pub mod utils;
21
pub mod commands;
2+
pub mod utils;

src/utils.rs

Lines changed: 95 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
use std::path::{Path, PathBuf};
21
use datafusion::arrow::error::ArrowError;
32
use datafusion::dataframe::DataFrameWriteOptions;
4-
use datafusion::prelude::*;
53
use datafusion::error::DataFusionError;
6-
use thiserror::Error;
7-
use tempfile::NamedTempFile;
4+
use datafusion::prelude::*;
85
use reqwest::Client;
6+
use std::path::{Path, PathBuf};
7+
use tempfile::NamedTempFile;
8+
use thiserror::Error;
99

1010
#[derive(Debug, PartialEq, Eq)]
1111
pub enum FileFormat {
@@ -47,21 +47,28 @@ pub enum DfKitError {
4747
Reqwest(#[from] reqwest::Error),
4848
}
4949

50-
pub fn file_type(
51-
file_path: &Path,
52-
) -> Result<FileFormat,FileParseError> {
53-
match Path::new(file_path).extension().and_then(|ext| ext.to_str()) {
54-
Some("csv") => Ok(FileFormat::Csv),
55-
Some("parquet") => Ok(FileFormat::Parquet),
56-
Some("json") => Ok(FileFormat::Json),
57-
Some("avro") => Ok(FileFormat::Avro),
58-
Some(_) => Err(FileParseError::UnsupportedFileFormat),
59-
None => Err(FileParseError::InvalidExtension),
60-
}
50+
pub fn file_type(file_path: &Path) -> Result<FileFormat, FileParseError> {
51+
match Path::new(file_path)
52+
.extension()
53+
.and_then(|ext| ext.to_str())
54+
{
55+
Some("csv") => Ok(FileFormat::Csv),
56+
Some("parquet") => Ok(FileFormat::Parquet),
57+
Some("json") => Ok(FileFormat::Json),
58+
Some("avro") => Ok(FileFormat::Avro),
59+
Some(_) => Err(FileParseError::UnsupportedFileFormat),
60+
None => Err(FileParseError::InvalidExtension),
61+
}
6162
}
6263

63-
pub async fn register_table(ctx: &SessionContext, table_name: &str, file_path: &Path) -> Result<DataFrame, DfKitError> {
64-
let path_str = file_path.to_str().ok_or(DfKitError::FileParse(FileParseError::InvalidExtension))?;
64+
pub async fn register_table(
65+
ctx: &SessionContext,
66+
table_name: &str,
67+
file_path: &Path,
68+
) -> Result<DataFrame, DfKitError> {
69+
let path_str = file_path
70+
.to_str()
71+
.ok_or(DfKitError::FileParse(FileParseError::InvalidExtension))?;
6572
let is_url = path_str.starts_with("http://") || path_str.starts_with("https://");
6673

6774
let actual_path = if is_url {
@@ -72,20 +79,38 @@ pub async fn register_table(ctx: &SessionContext, table_name: &str, file_path: &
7279
};
7380

7481
let file_format = file_type(&actual_path)?;
75-
let file_name = actual_path.to_str().ok_or(DfKitError::FileParse(FileParseError::InvalidExtension))?;
82+
let file_name = actual_path
83+
.to_str()
84+
.ok_or(DfKitError::FileParse(FileParseError::InvalidExtension))?;
7685
match file_format {
77-
FileFormat::Csv => ctx.register_csv(table_name, file_name, CsvReadOptions::default()).await?,
78-
FileFormat::Parquet => ctx.register_parquet(table_name, file_name, ParquetReadOptions::default()).await?,
79-
FileFormat::Json => ctx.register_json(table_name, file_name, NdJsonReadOptions::default()).await?,
80-
FileFormat::Avro => ctx.register_avro(table_name, file_name, AvroReadOptions::default()).await?,
86+
FileFormat::Csv => {
87+
ctx.register_csv(table_name, file_name, CsvReadOptions::default())
88+
.await?
89+
}
90+
FileFormat::Parquet => {
91+
ctx.register_parquet(table_name, file_name, ParquetReadOptions::default())
92+
.await?
93+
}
94+
FileFormat::Json => {
95+
ctx.register_json(table_name, file_name, NdJsonReadOptions::default())
96+
.await?
97+
}
98+
FileFormat::Avro => {
99+
ctx.register_avro(table_name, file_name, AvroReadOptions::default())
100+
.await?
101+
}
81102
};
82103

83104
Ok(ctx.table(table_name).await?)
84105
}
85106

86-
pub fn parse_file_list(files: Option<String>, dir: Option<PathBuf>) -> Result<Vec<PathBuf>, DfKitError> {
107+
pub fn parse_file_list(
108+
files: Option<String>,
109+
dir: Option<PathBuf>,
110+
) -> Result<Vec<PathBuf>, DfKitError> {
87111
if let Some(file_str) = files {
88-
Ok(file_str.split(',')
112+
Ok(file_str
113+
.split(',')
89114
.map(|s| PathBuf::from(s.trim()))
90115
.collect())
91116
} else if let Some(dir_path) = dir {
@@ -102,17 +127,46 @@ pub fn parse_file_list(files: Option<String>, dir: Option<PathBuf>) -> Result<Ve
102127
}
103128
Ok(file_list)
104129
} else {
105-
Err(DfKitError::CustomError("No files or directory provided".into()))
130+
Err(DfKitError::CustomError(
131+
"No files or directory provided".into(),
132+
))
106133
}
107134
}
108135

109-
pub async fn write_output(df: DataFrame, out_path: &Path, format: &FileFormat) -> Result<(), DfKitError> {
136+
pub async fn write_output(
137+
df: DataFrame,
138+
out_path: &Path,
139+
format: &FileFormat,
140+
) -> Result<(), DfKitError> {
110141
match format {
111-
FileFormat::Csv => df.write_csv(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
112-
FileFormat::Parquet => df.write_parquet(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
113-
FileFormat::Json => df.write_json(out_path.to_str().unwrap(), DataFrameWriteOptions::default(), None).await?,
142+
FileFormat::Csv => {
143+
df.write_csv(
144+
out_path.to_str().unwrap(),
145+
DataFrameWriteOptions::default(),
146+
None,
147+
)
148+
.await?
149+
}
150+
FileFormat::Parquet => {
151+
df.write_parquet(
152+
out_path.to_str().unwrap(),
153+
DataFrameWriteOptions::default(),
154+
None,
155+
)
156+
.await?
157+
}
158+
FileFormat::Json => {
159+
df.write_json(
160+
out_path.to_str().unwrap(),
161+
DataFrameWriteOptions::default(),
162+
None,
163+
)
164+
.await?
165+
}
114166
FileFormat::Avro => {
115-
return Err(DfKitError::DataFusion(DataFusionError::NotImplemented("Avro write not supported".into())));
167+
return Err(DfKitError::DataFusion(DataFusionError::NotImplemented(
168+
"Avro write not supported".into(),
169+
)));
116170
}
117171
};
118172
Ok(())
@@ -122,13 +176,17 @@ pub async fn download_to_tempfile(url: &str) -> Result<(NamedTempFile, PathBuf),
122176
let response = Client::new().get(url).send().await?.bytes().await?;
123177

124178
// Try to extract the file extension from the URL
125-
let ext = url.split('.').last().and_then(|e| {
126-
let e = e.split('?').next().unwrap_or(e); // strip query string
127-
match e {
128-
"csv" | "json" | "parquet" | "avro" => Some(e),
129-
_ => None,
130-
}
131-
}).ok_or(FileParseError::InvalidExtension)?;
179+
let ext = url
180+
.split('.')
181+
.last()
182+
.and_then(|e| {
183+
let e = e.split('?').next().unwrap_or(e); // strip query string
184+
match e {
185+
"csv" | "json" | "parquet" | "avro" => Some(e),
186+
_ => None,
187+
}
188+
})
189+
.ok_or(FileParseError::InvalidExtension)?;
132190

133191
// Create temp file with extension
134192
let tempfile = NamedTempFile::new()?;
@@ -140,4 +198,3 @@ pub async fn download_to_tempfile(url: &str) -> Result<(NamedTempFile, PathBuf),
140198

141199
Ok((tempfile, path_with_ext))
142200
}
143-

0 commit comments

Comments
 (0)