Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ union LogicalType {
17: (GeometryType) Geometry
/// A geospatial feature in the WKB format with an explicit (non-linear/non-planar) edges interpolation.
18: (GeographyType) Geography
/// A reference to an external file.
19: File
}
);

Expand Down Expand Up @@ -1054,6 +1056,7 @@ impl ColumnOrder {
LogicalType::Variant(_)
| LogicalType::Geometry(_)
| LogicalType::Geography(_)
| LogicalType::File
| LogicalType::_Unknown { .. } => SortOrder::UNDEFINED,
},
// Fall back to converted type
Expand Down Expand Up @@ -1242,6 +1245,7 @@ impl From<Option<LogicalType>> for ConvertedType {
| LogicalType::Variant(_)
| LogicalType::Geometry(_)
| LogicalType::Geography(_)
| LogicalType::File
| LogicalType::_Unknown { .. }
| LogicalType::Unknown => ConvertedType::NONE,
},
Expand Down Expand Up @@ -1341,6 +1345,7 @@ impl str::FromStr for LogicalType {
)),
"FLOAT16" => Ok(LogicalType::Float16),
"VARIANT" => Ok(LogicalType::variant(None)),
"FILE" => Ok(LogicalType::File),
"GEOMETRY" => Ok(LogicalType::geometry(None)),
"GEOGRAPHY" => Ok(LogicalType::geography(
None,
Expand Down
40 changes: 40 additions & 0 deletions parquet/src/schema/printer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ fn print_logical_and_converted(
format!("GEOGRAPHY({algorithm})")
}
}
LogicalType::File => "FILE".to_string(),
LogicalType::Unknown => "UNKNOWN".to_string(),
LogicalType::_Unknown { field_id } => format!("_Unknown({field_id})"),
},
Expand Down Expand Up @@ -1173,4 +1174,43 @@ mod tests {

assert_print_parse_message(message);
}

#[test]
fn test_print_file_type() {
let mut s = String::new();
{
let mut p = Printer::new(&mut s);
let path_field = Arc::new(
Type::primitive_type_builder("path", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::String))
.build()
.unwrap(),
);
let size_field = Arc::new(
Type::primitive_type_builder("size", PhysicalType::INT64)
.with_repetition(Repetition::OPTIONAL)
.build()
.unwrap(),
);
let file_field = Type::group_type_builder("f")
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::File))
.with_fields(vec![path_field, size_field])
.build()
.unwrap();
let message = Type::group_type_builder("schema")
.with_fields(vec![Arc::new(file_field)])
.build()
.unwrap();
p.print(&message);
}
let expected = "message schema {
REQUIRED group f (FILE) {
REQUIRED BYTE_ARRAY path (STRING);
OPTIONAL INT64 size;
}
}";
assert_eq!(&mut s, expected);
}
}
227 changes: 226 additions & 1 deletion parquet/src/schema/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl<'a> PrimitiveTypeBuilder<'a> {
}
// Check that logical type and physical type are compatible
match (logical_type, self.physical_type) {
(LogicalType::Map, _) | (LogicalType::List, _) => {
(LogicalType::Map, _) | (LogicalType::List, _) | (LogicalType::File, _) => {
return Err(general_err!(
"{:?} cannot be applied to a primitive type for field '{}'",
logical_type,
Expand Down Expand Up @@ -645,6 +645,9 @@ impl<'a> GroupTypeBuilder<'a> {

/// Creates a new `GroupType` instance from the gathered attributes.
pub fn build(self) -> Result<Type> {
if let Some(LogicalType::File) = &self.logical_type {
validate_file_type_fields(self.name, &self.fields)?;
}
let mut basic_info = BasicTypeInfo {
name: String::from(self.name),
repetition: self.repetition,
Expand All @@ -663,6 +666,39 @@ impl<'a> GroupTypeBuilder<'a> {
}
}

fn validate_file_type_fields(name: &str, fields: &[TypePtr]) -> Result<()> {
const VALID_OPTIONAL_FIELDS: &[&str] = &["size", "offset", "etag"];
let mut has_path = false;
for field in fields {
let field_name = field.get_basic_info().name();
if field_name == "path" {
let is_required = field.get_basic_info().has_repetition()
&& field.get_basic_info().repetition() == Repetition::REQUIRED;
if !is_required {
return Err(general_err!(
"FILE type field 'path' must be REQUIRED in group '{}'",
name
));
}
has_path = true;
} else if !VALID_OPTIONAL_FIELDS.contains(&field_name) {
return Err(general_err!(
"FILE type group '{}' contains unrecognized field '{}'. \
Valid fields are: path, size, offset, etag",
name,
field_name
));
}
}
if !has_path {
return Err(general_err!(
"FILE type group '{}' must contain required field 'path'",
name
));
}
Ok(())
}

/// Basic type info. This contains information such as the name of the type,
/// the repetition level, the logical type and the kind of the type (group, primitive).
#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -2505,6 +2541,195 @@ mod tests {
assert_eq!(result_schema, expected_schema);
}

#[test]
fn test_file_logical_type_roundtrip() {
let path_field = Arc::new(
Type::primitive_type_builder("path", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::String))
.build()
.unwrap(),
);
let size_field = Arc::new(
Type::primitive_type_builder("size", PhysicalType::INT64)
.with_repetition(Repetition::OPTIONAL)
.build()
.unwrap(),
);
let offset_field = Arc::new(
Type::primitive_type_builder("offset", PhysicalType::INT64)
.with_repetition(Repetition::OPTIONAL)
.build()
.unwrap(),
);
let etag_field = Arc::new(
Type::primitive_type_builder("etag", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::OPTIONAL)
.with_logical_type(Some(LogicalType::String))
.build()
.unwrap(),
);
let file_group = Arc::new(
Type::group_type_builder("f")
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::File))
.with_fields(vec![path_field, size_field, offset_field, etag_field])
.build()
.unwrap(),
);
let schema = Arc::new(
Type::group_type_builder("example")
.with_fields(vec![file_group])
.build()
.unwrap(),
);
let result = roundtrip_schema(schema.clone()).unwrap();
assert_eq!(result, schema);
assert_eq!(
result
.get_fields()[0]
.get_basic_info()
.logical_type_ref(),
Some(&LogicalType::File)
);
}

#[test]
fn test_file_logical_type_path_only() {
let path_field = Arc::new(
Type::primitive_type_builder("path", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::String))
.build()
.unwrap(),
);
let result = Type::group_type_builder("file_field")
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::File))
.with_fields(vec![path_field])
.build();
assert!(result.is_ok());
let tp = result.unwrap();
assert_eq!(
tp.get_basic_info().logical_type_ref(),
Some(&LogicalType::File)
);
}

#[test]
fn test_file_logical_type_all_fields() {
let path_field = Arc::new(
Type::primitive_type_builder("path", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::String))
.build()
.unwrap(),
);
let size_field = Arc::new(
Type::primitive_type_builder("size", PhysicalType::INT64)
.with_repetition(Repetition::OPTIONAL)
.build()
.unwrap(),
);
let offset_field = Arc::new(
Type::primitive_type_builder("offset", PhysicalType::INT64)
.with_repetition(Repetition::OPTIONAL)
.build()
.unwrap(),
);
let etag_field = Arc::new(
Type::primitive_type_builder("etag", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::OPTIONAL)
.with_logical_type(Some(LogicalType::String))
.build()
.unwrap(),
);
let result = Type::group_type_builder("file_field")
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::File))
.with_fields(vec![path_field, size_field, offset_field, etag_field])
.build();
assert!(result.is_ok());
assert_eq!(result.unwrap().get_fields().len(), 4);
}

#[test]
fn test_file_logical_type_requires_path_field() {
let size_field = Arc::new(
Type::primitive_type_builder("size", PhysicalType::INT64)
.with_repetition(Repetition::OPTIONAL)
.build()
.unwrap(),
);
let result = Type::group_type_builder("missing_path")
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::File))
.with_fields(vec![size_field])
.build();
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("must contain required field 'path'"));
}

#[test]
fn test_file_logical_type_rejects_unrecognized_field() {
let path_field = Arc::new(
Type::primitive_type_builder("path", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::String))
.build()
.unwrap(),
);
let unknown_field = Arc::new(
Type::primitive_type_builder("unknown_field", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::OPTIONAL)
.build()
.unwrap(),
);
let result = Type::group_type_builder("bad_file")
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::File))
.with_fields(vec![path_field, unknown_field])
.build();
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("unrecognized field"));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: in general I prefer full string compares to contains. In this case it should at least test for the specific unrecognized field name 'unknown_field'.

}

#[test]
fn test_file_logical_type_requires_required_path_field() {
let path_field = Arc::new(
Type::primitive_type_builder("path", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::OPTIONAL)
.with_logical_type(Some(LogicalType::String))
.build()
.unwrap(),
);
let result = Type::group_type_builder("optional_path")
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::File))
.with_fields(vec![path_field])
.build();
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("'path' must be REQUIRED"));
}

#[test]
fn test_file_logical_type_not_allowed_on_primitive() {
let result = Type::primitive_type_builder("bad", PhysicalType::BYTE_ARRAY)
.with_repetition(Repetition::REQUIRED)
.with_logical_type(Some(LogicalType::File))
.build();
assert!(result.is_err());
}

#[test]
fn test_parquet_schema_from_array_rejects_negative_num_children() {
let elements = vec![SchemaElement {
Expand Down
Loading