diff --git a/LICENSE b/LICENSE index 80cfd3652e69..13ee00992e69 100644 --- a/LICENSE +++ b/LICENSE @@ -229,6 +229,7 @@ This product includes code from Apache Parquet. * DynConstructors.java * IOUtil.java readFully and tests * ByteBufferInputStream implementations and tests +* ByteStreamSplitValuesReader implementation Copyright: 2014-2017 The Apache Software Foundation. Home page: https://parquet.apache.org/ diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedByteStreamSplitValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedByteStreamSplitValuesReader.java new file mode 100644 index 000000000000..42c288b16886 --- /dev/null +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedByteStreamSplitValuesReader.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized.parquet; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.arrow.vector.FieldVector; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.api.Binary; + +/** + * A {@link VectorizedValuesReader} implementation for the encoding type BYTE_STREAM_SPLIT. This is + * adapted from Parquet's ByteStreamSplitValuesReader. + * + * @see + * Parquet format encodings: BYTE_STREAM_SPLIT + */ +public class VectorizedByteStreamSplitValuesReader extends ValuesReader + implements VectorizedValuesReader { + + private int totalBytesInStream; + private ByteBufferInputStream in; + private ByteBuffer decodedDataStream; + + public VectorizedByteStreamSplitValuesReader() {} + + @Override + public void initFromPage(int ignoredValueCount, ByteBufferInputStream inputStream) + throws IOException { + totalBytesInStream = inputStream.available(); + this.in = inputStream; + } + + @Override + public float readFloat() { + ensureDecodedBufferIsInitializedForElementSize(FLOAT_SIZE); + return decodedDataStream.getFloat(); + } + + @Override + public double readDouble() { + ensureDecodedBufferIsInitializedForElementSize(DOUBLE_SIZE); + return decodedDataStream.getDouble(); + } + + @Override + public void readFloats(int total, FieldVector vec, int rowId) { + readValues( + FLOAT_SIZE, + total, + rowId, + offset -> vec.getDataBuffer().setFloat(offset, decodedDataStream.getFloat())); + } + + @Override + public void readDoubles(int total, FieldVector vec, int rowId) { + readValues( + DOUBLE_SIZE, + total, + rowId, + offset -> vec.getDataBuffer().setDouble(offset, decodedDataStream.getDouble())); + } + + private void ensureDecodedBufferIsInitializedForElementSize(int elementSizeInBytes) { + if (decodedDataStream == null) { + decodedDataStream = + decodeDataFromStream(totalBytesInStream / elementSizeInBytes, elementSizeInBytes); + } + } + + private void readValues(int elementSizeInBytes, int total, int rowId, OutputWriter outputWriter) { + ensureDecodedBufferIsInitializedForElementSize(elementSizeInBytes); + decodedDataStream.position(rowId * elementSizeInBytes); + for (int i = 0; i < total; i++) { + int offset = (rowId + i) * elementSizeInBytes; + outputWriter.writeToOutput(offset); + } + } + + @FunctionalInterface + interface OutputWriter { + void writeToOutput(int offset); + } + + private ByteBuffer decodeDataFromStream(int valuesCount, int elementSizeInBytes) { + ByteBuffer encoded; + try { + encoded = in.slice(totalBytesInStream).slice(); + } catch (EOFException e) { + throw new RuntimeException("Failed to read bytes from stream", e); + } + byte[] decoded = new byte[encoded.limit()]; + int destByteIndex = 0; + for (int srcValueIndex = 0; srcValueIndex < valuesCount; ++srcValueIndex) { + for (int stream = 0; stream < elementSizeInBytes; ++stream, ++destByteIndex) { + decoded[destByteIndex] = encoded.get(srcValueIndex + stream * valuesCount); + } + } + return ByteBuffer.wrap(decoded).order(ByteOrder.LITTLE_ENDIAN); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public boolean readBoolean() { + throw new UnsupportedOperationException("readBoolean is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public byte readByte() { + throw new UnsupportedOperationException("readByte is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public short readShort() { + throw new UnsupportedOperationException("readShort is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public int readInteger() { + throw new UnsupportedOperationException("readInteger is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public long readLong() { + throw new UnsupportedOperationException("readLong is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public Binary readBinary(int len) { + throw new UnsupportedOperationException("readBinary is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public void readIntegers(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readIntegers is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public void readLongs(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readLongs is not supported"); + } + + /** The Iceberg reader currently does not do skipping */ + @Override + public void skip() { + throw new UnsupportedOperationException("skip is not supported"); + } +} diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java index be1a3324ae43..c5613c1b67b2 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java @@ -100,6 +100,9 @@ protected void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, i case DELTA_BINARY_PACKED: valuesReader = new VectorizedDeltaEncodedValuesReader(); break; + case BYTE_STREAM_SPLIT: + valuesReader = new VectorizedByteStreamSplitValuesReader(); + break; default: throw new UnsupportedOperationException( "Cannot support vectorized reads for column " diff --git a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double.parquet b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double.parquet new file mode 100644 index 000000000000..3e0edd2627ab Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double_with_nulls.parquet new file mode 100644 index 000000000000..d23c1e6b7387 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float.parquet b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float.parquet new file mode 100644 index 000000000000..8ba32a302f08 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float_with_nulls.parquet new file mode 100644 index 000000000000..c16e10e6804e Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/double.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/double.parquet new file mode 100644 index 000000000000..edd614c66af6 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN/double.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/double_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/double_with_nulls.parquet new file mode 100644 index 000000000000..3d4f64baa722 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN/double_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/float_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/float_with_nulls.parquet new file mode 100644 index 000000000000..c9abb9b98def Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN/float_with_nulls.parquet differ diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index 6e823a8bfc05..a3ca1fe9fc4b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -81,7 +81,8 @@ public class TestParquetVectorizedReads extends AvroDataTestBase { private static final String PLAIN = "PLAIN"; private static final List GOLDEN_FILE_ENCODINGS = - ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED"); + ImmutableList.of( + "PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED", "BYTE_STREAM_SPLIT"); private static final Map GOLDEN_FILE_TYPES = ImmutableMap.of( "string", Types.StringType.get(), @@ -89,7 +90,8 @@ public class TestParquetVectorizedReads extends AvroDataTestBase { "int32", Types.IntegerType.get(), "int64", Types.LongType.get(), "binary", Types.BinaryType.get(), - "boolean", Types.BooleanType.get()); + "boolean", Types.BooleanType.get(), + "double", Types.DoubleType.get()); static final Function IDENTITY = record -> record; @@ -504,10 +506,16 @@ static Stream goldenFilesAndEncodings() { .flatMap( e -> Stream.of(true, false) - .map( + .flatMap( vectorized -> - Arguments.of( - encoding, e.getKey(), e.getValue(), vectorized)))); + Stream.of( + Arguments.of( + encoding, e.getKey(), e.getValue(), vectorized), + Arguments.of( + encoding, + e.getKey() + "_with_nulls", + e.getValue(), + vectorized))))); } private File resourceUrlToLocalFile(URL url) throws IOException, URISyntaxException { diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index 46a6a302e1c4..89368e23f731 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -81,7 +81,8 @@ public class TestParquetVectorizedReads extends AvroDataTestBase { private static final String PLAIN = "PLAIN"; private static final List GOLDEN_FILE_ENCODINGS = - ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED"); + ImmutableList.of( + "PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED", "BYTE_STREAM_SPLIT"); private static final Map GOLDEN_FILE_TYPES = ImmutableMap.of( "string", Types.StringType.get(), @@ -89,7 +90,8 @@ public class TestParquetVectorizedReads extends AvroDataTestBase { "int32", Types.IntegerType.get(), "int64", Types.LongType.get(), "binary", Types.BinaryType.get(), - "boolean", Types.BooleanType.get()); + "boolean", Types.BooleanType.get(), + "double", Types.DoubleType.get()); static final Function IDENTITY = record -> record; @@ -490,10 +492,16 @@ static Stream goldenFilesAndEncodings() { .flatMap( e -> Stream.of(true, false) - .map( + .flatMap( vectorized -> - Arguments.of( - encoding, e.getKey(), e.getValue(), vectorized)))); + Stream.of( + Arguments.of( + encoding, e.getKey(), e.getValue(), vectorized), + Arguments.of( + encoding, + e.getKey() + "_with_nulls", + e.getValue(), + vectorized))))); } private File resourceUrlToLocalFile(URL url) throws IOException, URISyntaxException {