Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions parquet_derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ extern crate quote;

extern crate parquet;

use ::syn::{Data, DataStruct, DeriveInput, parse_macro_input};
use ::syn::{Data, DataStruct, DeriveInput, ext::IdentExt, parse_macro_input};

mod parquet_field;

Expand Down Expand Up @@ -234,6 +234,18 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke

let field_infos: Vec<_> = fields.iter().map(parquet_field::Field::from).collect();
let field_names: Vec<_> = fields.iter().map(|f| f.ident.clone()).collect();
// unraw the identifiers, so raw identifiers like `r#type` are looked
// up by their column name `type` in the parquet file
let field_names_str: Vec<_> = fields
.iter()
.map(|f| {
f.ident
.as_ref()
.expect("Only structs with named fields are currently supported")
.unraw()
.to_string()
})
.collect();
let reader_snippets: Vec<proc_macro2::TokenStream> =
field_infos.iter().map(|x| x.reader_snippet()).collect();

Expand Down Expand Up @@ -270,10 +282,10 @@ pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::Toke

#(
{
let idx: usize = match name_to_index.get(stringify!(#field_names)) {
let idx: usize = match name_to_index.get(#field_names_str) {
Some(&col_idx) => col_idx,
None => {
let error_msg = format!("column name '{}' is not found in parquet file!", stringify!(#field_names));
let error_msg = format!("column name '{}' is not found in parquet file!", #field_names_str);
return Err(::parquet::errors::ParquetError::General(error_msg));
}
};
Expand Down
25 changes: 24 additions & 1 deletion parquet_derive/src/parquet_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use syn::ext::IdentExt;

#[derive(Debug, PartialEq)]
pub struct Field {
ident: syn::Ident,
Expand Down Expand Up @@ -293,7 +295,9 @@ impl Field {
// TODO: Support group types
// TODO: Add length if dealing with fixedlenbinary

let field_name = &self.ident.to_string();
// unraw the identifier, so a raw identifier like `r#type`
// becomes a column named `type` in the parquet schema
let field_name = self.ident.unraw().to_string();
let physical_type = match self.ty.physical_type() {
parquet::basic::Type::BOOLEAN => quote! {
::parquet::basic::Type::BOOLEAN
Expand Down Expand Up @@ -880,6 +884,25 @@ mod test {
)
}

#[test]
fn test_parquet_type_with_raw_identifier() {
let snippet: proc_macro2::TokenStream = quote! {
struct ABoringStruct {
r#type: i32,
}
};

let fields = extract_fields(snippet);
let r#type = Field::from(&fields[0]);

// the raw identifier `r#type` is named `type` in the parquet schema
let snippet = r#type.parquet_type().to_string();
assert!(
snippet.contains("primitive_type_builder (\"type\""),
"{snippet}"
);
}

#[test]
fn test_optional_to_writer_snippet() {
let struct_def: proc_macro2::TokenStream = quote! {
Expand Down
49 changes: 49 additions & 0 deletions parquet_derive_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ struct APrunedRecord {
pub isize: isize,
}

// This struct has a field declared with a raw identifier,
// which maps to a parquet column named without the `r#` prefix
// (e.g. a column named `type`, as written by other tools)
#[derive(PartialEq, ParquetRecordWriter, ParquetRecordReader, Debug)]
struct ARecordWithRawIdentifiers {
pub r#type: i32,
pub count: i32,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -356,6 +365,46 @@ mod tests {
assert_eq!(drs[0].isize, out[0].isize);
}

#[test]
fn test_parquet_derive_raw_identifiers() {
let file = get_temp_file("test_parquet_derive_raw_identifiers", &[]);
let drs = vec![ARecordWithRawIdentifiers {
r#type: 456,
count: 123,
}];

let generated_schema = drs.as_slice().schema().unwrap();

// raw identifiers are written without the `r#` prefix,
// while normal identifiers are unchanged
assert_eq!(
vec!["type", "count"],
generated_schema
.get_fields()
.iter()
.map(|field| field.name())
.collect::<Vec<_>>()
);

let props = Default::default();
let mut writer =
SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap();

let mut row_group = writer.next_row_group().unwrap();
drs.as_slice().write_to_row_group(&mut row_group).unwrap();
row_group.close().unwrap();
writer.close().unwrap();

use parquet::file::{reader::FileReader, serialized_reader::SerializedFileReader};
let reader = SerializedFileReader::new(file).unwrap();
let mut out: Vec<ARecordWithRawIdentifiers> = Vec::new();

let mut row_group = reader.get_row_group(0).unwrap();
out.read_from_row_group(&mut *row_group, 1).unwrap();

assert_eq!(drs, out);
}

#[test]
fn test_aliased_result() {
// Issue 7547, Where aliasing the `Result` led to
Expand Down
Loading