diff --git a/LICENSE b/LICENSE index 80cfd3652e69..5b7355e1c349 100644 --- a/LICENSE +++ b/LICENSE @@ -289,6 +289,7 @@ This product includes code from Apache Spark. * implementation of SetAccumulator. * Connector expressions. * implementation of VectorizedDeltaEncodedValuesReader +* implementation of VectorizedDeltaLengthByteArrayValuesReader Copyright: 2011-2018 The Apache Software Foundation Home page: https://spark.apache.org/ diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java index 115518e1fb50..efd631557cdf 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java @@ -91,6 +91,11 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce firstValue = BytesUtils.readZigZagVarLong(this.inputStream); } + // True value count. May be less than valueCount because of nulls + int getTotalValueCount() { + return totalValueCount; + } + /** DELTA_BINARY_PACKED only supports INT32 and INT64 */ @Override public byte readByte() { @@ -149,6 +154,12 @@ public void readDoubles(int total, FieldVector vec, int rowId) { throw new UnsupportedOperationException("readDoubles is not supported"); } + /** DELTA_BINARY_PACKED only supports INT32 and INT64 */ + @Override + public void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector) { + throw new UnsupportedOperationException("readBinary is not supported"); + } + private void readValues( int total, FieldVector vec, int rowId, int typeWidth, IntegerOutputWriter outputWriter) { if (valuesRead + total > totalValueCount) { diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaLengthByteArrayValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaLengthByteArrayValuesReader.java new file mode 100644 index 000000000000..3bfeb10bd7fe --- /dev/null +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaLengthByteArrayValuesReader.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized.parquet; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.function.IntUnaryOperator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.iceberg.arrow.ArrowAllocation; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.api.Binary; + +public class VectorizedDeltaLengthByteArrayValuesReader + implements VectorizedValuesReader, AutoCloseable { + + private final VectorizedDeltaEncodedValuesReader lengthReader; + private final CloseableGroup closeables; + + private ByteBufferInputStream in; + private IntVector lengthsVector; + private ByteBuffer byteBuffer; + + VectorizedDeltaLengthByteArrayValuesReader() { + lengthReader = new VectorizedDeltaEncodedValuesReader(); + closeables = new CloseableGroup(); + } + + @Override + public void initFromPage(int valueCount, ByteBufferInputStream inputStream) throws IOException { + lengthsVector = new IntVector("length-" + UUID.randomUUID(), ArrowAllocation.rootAllocator()); + closeables.addCloseable(lengthsVector); + lengthReader.initFromPage(valueCount, inputStream); + lengthReader.readIntegers(lengthReader.getTotalValueCount(), lengthsVector, 0); + this.in = inputStream.remainingStream(); + } + + @Override + public Binary readBinary(int len) { + readValues(1, null, 0, x -> len, (f, i, v) -> byteBuffer = v); + return Binary.fromReusedByteBuffer(byteBuffer); + } + + @Override + public void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector) { + readValues( + total, + vec, + rowId, + x -> lengthsVector.get(x), + (f, i, v) -> f.getDataBuffer().setBytes(i, v)); + } + + @SuppressWarnings("UnusedVariable") + private void readValues( + int total, + FieldVector vec, + int rowId, + IntUnaryOperator getLength, + BinaryOutputWriter outputWriter) { + ByteBuffer buffer; + for (int i = 0; i < total; i++) { + int length = getLength.applyAsInt(rowId + i); + try { + if (length <= 0) { + throw new IllegalStateException("Invalid length: " + length); + } + buffer = in.slice(length); + } catch (EOFException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes"); + } + outputWriter.write(vec, rowId + i, buffer); + } + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public boolean readBoolean() { + throw new UnsupportedOperationException("readBoolean is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public byte readByte() { + throw new UnsupportedOperationException("readByte is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public short readShort() { + throw new UnsupportedOperationException("readShort is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public int readInteger() { + throw new UnsupportedOperationException("readInteger is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public long readLong() { + throw new UnsupportedOperationException("readLong is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public float readFloat() { + throw new UnsupportedOperationException("readFloat is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public double readDouble() { + throw new UnsupportedOperationException("readDouble is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public void readIntegers(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readIntegers is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public void readLongs(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readLongs is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public void readFloats(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readFloats is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public void readDoubles(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readDoubles is not supported"); + } + + @Override + public void close() throws Exception { + closeables.close(); + } + + /** A functional interface to write binary values into a FieldVector */ + @FunctionalInterface + interface BinaryOutputWriter { + + /** + * A functional interface that can be used to write a binary value to a specified row in a + * FieldVector + * + * @param vec a FieldVector to write the value into + * @param index The offset to write to + * @param val value to write + */ + void write(FieldVector vec, long index, ByteBuffer val); + } +} diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java index be1a3324ae43..578d743314a5 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java @@ -100,6 +100,9 @@ protected void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, i case DELTA_BINARY_PACKED: valuesReader = new VectorizedDeltaEncodedValuesReader(); break; + case DELTA_LENGTH_BYTE_ARRAY: + valuesReader = new VectorizedDeltaLengthByteArrayValuesReader(); + break; default: throw new UnsupportedOperationException( "Cannot support vectorized reads for column " diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java index c7dbe8de7b92..f02032ac9f98 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java @@ -584,20 +584,7 @@ protected void nextVal( VectorizedValuesReader valuesReader, int typeWidth, byte[] byteArray) { - int len = valuesReader.readInteger(); - ByteBuffer buffer = valuesReader.readBinary(len).toByteBuffer(); - // Calling setValueLengthSafe takes care of allocating a larger buffer if - // running out of space. - ((BaseVariableWidthVector) vector).setValueLengthSafe(idx, len); - int startOffset = ((BaseVariableWidthVector) vector).getStartOffset(idx); - // It is possible that the data buffer was reallocated. So it is important to - // not cache the data buffer reference but instead use vector.getDataBuffer(). - vector.getDataBuffer().setBytes(startOffset, buffer); - // Similarly, we need to get the latest reference to the validity buffer as well - // since reallocation changes reference of the validity buffers as well. - if (setArrowValidityVector) { - BitVectorHelper.setBit(vector.getValidityBuffer(), idx); - } + valuesReader.readBinary(1, vector, idx, setArrowValidityVector); } @Override diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java index 764b2fc353e3..56bcfe3ff8e2 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java @@ -19,6 +19,8 @@ package org.apache.iceberg.arrow.vectorized.parquet; import java.nio.ByteBuffer; +import org.apache.arrow.vector.BaseVariableWidthVector; +import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.FieldVector; import org.apache.iceberg.parquet.ValuesAsBytesReader; import org.apache.parquet.io.api.Binary; @@ -74,4 +76,22 @@ public void readFloats(int total, FieldVector vec, int rowId) { public void readDoubles(int total, FieldVector vec, int rowId) { readValues(total, vec, rowId, DOUBLE_SIZE); } + + @Override + public void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector) { + int len = readInteger(); + ByteBuffer buffer = readBinary(len).toByteBuffer(); + // Calling setValueLengthSafe takes care of allocating a larger buffer if + // running out of space. + ((BaseVariableWidthVector) vec).setValueLengthSafe(rowId, len); + int startOffset = ((BaseVariableWidthVector) vec).getStartOffset(rowId); + // It is possible that the data buffer was reallocated. So it is important to + // not cache the data buffer reference but instead use vector.getDataBuffer(). + vec.getDataBuffer().setBytes(startOffset, buffer); + // Similarly, we need to get the latest reference to the validity buffer as well + // since reallocation changes reference of the validity buffers as well. + if (setArrowValidityVector) { + BitVectorHelper.setBit(vec.getValidityBuffer(), rowId); + } + } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedValuesReader.java index 7c23149b18ab..48f118f387e9 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedValuesReader.java @@ -58,7 +58,7 @@ interface VectorizedValuesReader { double readDouble(); /** - * Read binary data of some length + * Read a single binary value of some length * * @param len The number of bytes to read */ @@ -76,6 +76,9 @@ interface VectorizedValuesReader { /** Read `total` doubles into `vec` starting at `vec[rowId]` */ void readDoubles(int total, FieldVector vec, int rowId); + /** Read `total` binary values into `vec` starting at `vec[rowId]` */ + void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector); + /** * Initialize the reader from a page. See {@link ValuesReader#initFromPage(int, * ByteBufferInputStream)}. diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index 3d7fb6966c23..4c7e0eb9e879 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -74,7 +74,8 @@ public class TestParquetVectorizedReads extends AvroDataTestBase { private static final String PLAIN = "PLAIN"; private static final List GOLDEN_FILE_ENCODINGS = - ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED"); + ImmutableList.of( + "PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED", "DELTA_LENGTH_BYTE_ARRAY"); private static final Map GOLDEN_FILE_TYPES = ImmutableMap.of( "string", Types.StringType.get(), diff --git a/spark/v4.0/spark/src/test/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/binary.parquet b/spark/v4.0/spark/src/test/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/binary.parquet new file mode 100644 index 000000000000..959e99bd6228 Binary files /dev/null and b/spark/v4.0/spark/src/test/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/binary.parquet differ