Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
a69ec52
rebase
eric-maynard Jun 10, 2025
0bba5ef
lint
eric-maynard Jun 10, 2025
9ecc2be
some changes per comments
eric-maynard Jun 18, 2025
3cd2819
Merge branch 'main' of ssh://github.com-oss/apache/iceberg into parqu…
eric-maynard Jun 18, 2025
8d186fe
javadoc
eric-maynard Jun 23, 2025
5ce8913
lint
eric-maynard Jun 23, 2025
9fe0bba
create class
eric-maynard Jun 23, 2025
6cecf96
remove clash
eric-maynard Jun 23, 2025
2ce2590
Merge branch 'parquet-v2-refactor' of ssh://github.com-oss/eric-mayna…
eric-maynard Jun 23, 2025
3aed168
refactoring
eric-maynard Jun 23, 2025
98d1c5c
clean up
eric-maynard Jun 23, 2025
b72e338
wire up
eric-maynard Jun 23, 2025
b76cc47
tweak header
eric-maynard Jun 25, 2025
ec07775
check in
eric-maynard Jun 25, 2025
c79a77c
resolve conflicts
eric-maynard Jun 26, 2025
1969466
debugging
eric-maynard Jun 27, 2025
d2b173b
debugging
eric-maynard Jun 27, 2025
1f219e5
debugging commit
eric-maynard Jul 1, 2025
21c11d8
move code
eric-maynard Jul 1, 2025
e4bc23f
switch back to floats
eric-maynard Jul 1, 2025
a88af2e
clean a bit
eric-maynard Jul 1, 2025
c375e99
semistable
eric-maynard Jul 1, 2025
f8cfbb2
polish
eric-maynard Jul 1, 2025
9d27297
stable:
eric-maynard Jul 1, 2025
d75f85e
spotless; polish
eric-maynard Jul 1, 2025
03f6395
spotless
eric-maynard Jul 1, 2025
c39570d
fix lints
eric-maynard Jul 2, 2025
1ac89a9
initial impl
eric-maynard Jul 2, 2025
ddeadf7
convinced I need to use a golden file
eric-maynard Jul 2, 2025
dc75fc4
resolve conflicts
eric-maynard Jul 30, 2025
f86b93c
resolve more conflicts
eric-maynard Jul 30, 2025
5117f9f
license
eric-maynard Jul 30, 2025
c7e5a68
revert
eric-maynard Jul 30, 2025
db99901
spotless
eric-maynard Aug 2, 2025
4528490
lint
eric-maynard Aug 4, 2025
fa3806d
Merge branch 'main' of ssh://github.com-oss/apache/iceberg into DELTA…
eric-maynard Aug 12, 2025
679390e
add golden file
eric-maynard Aug 12, 2025
6f5eeee
spotless
eric-maynard Aug 12, 2025
9f27974
spotless
eric-maynard Aug 12, 2025
b8173d6
change value
eric-maynard Aug 14, 2025
4d86a46
resolve conflicts
eric-maynard Aug 25, 2025
9954a43
spotless
eric-maynard Aug 25, 2025
29e59c5
change readBinary path
eric-maynard Aug 26, 2025
dbfd7bb
lint
eric-maynard Aug 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ This product includes code from Apache Spark.
* implementation of SetAccumulator.
* Connector expressions.
* implementation of VectorizedDeltaEncodedValuesReader
* implementation of VectorizedDeltaLengthByteArrayValuesReader

Copyright: 2011-2018 The Apache Software Foundation
Home page: https://spark.apache.org/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce
firstValue = BytesUtils.readZigZagVarLong(this.inputStream);
}

// True value count. May be less than valueCount because of nulls
int getTotalValueCount() {
return totalValueCount;
}

/** DELTA_BINARY_PACKED only supports INT32 and INT64 */
@Override
public byte readByte() {
Expand Down Expand Up @@ -149,6 +154,12 @@ public void readDoubles(int total, FieldVector vec, int rowId) {
throw new UnsupportedOperationException("readDoubles is not supported");
}

/** DELTA_BINARY_PACKED only supports INT32 and INT64 */
@Override
public void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector) {
throw new UnsupportedOperationException("readBinary is not supported");
}

private void readValues(
int total, FieldVector vec, int rowId, int typeWidth, IntegerOutputWriter outputWriter) {
if (valuesRead + total > totalValueCount) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.arrow.vectorized.parquet;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.function.IntUnaryOperator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.iceberg.arrow.ArrowAllocation;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;

public class VectorizedDeltaLengthByteArrayValuesReader
implements VectorizedValuesReader, AutoCloseable {

private final VectorizedDeltaEncodedValuesReader lengthReader;
private final CloseableGroup closeables;

private ByteBufferInputStream in;
private IntVector lengthsVector;
private ByteBuffer byteBuffer;

VectorizedDeltaLengthByteArrayValuesReader() {
lengthReader = new VectorizedDeltaEncodedValuesReader();
closeables = new CloseableGroup();
}

@Override
public void initFromPage(int valueCount, ByteBufferInputStream inputStream) throws IOException {
lengthsVector = new IntVector("length-" + UUID.randomUUID(), ArrowAllocation.rootAllocator());
closeables.addCloseable(lengthsVector);
lengthReader.initFromPage(valueCount, inputStream);
lengthReader.readIntegers(lengthReader.getTotalValueCount(), lengthsVector, 0);
this.in = inputStream.remainingStream();
}

@Override
public Binary readBinary(int len) {
readValues(1, null, 0, x -> len, (f, i, v) -> byteBuffer = v);
return Binary.fromReusedByteBuffer(byteBuffer);
}

@Override
public void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector) {
readValues(
total,
vec,
rowId,
x -> lengthsVector.get(x),
(f, i, v) -> f.getDataBuffer().setBytes(i, v));
}

@SuppressWarnings("UnusedVariable")
private void readValues(
int total,
FieldVector vec,
int rowId,
IntUnaryOperator getLength,
BinaryOutputWriter outputWriter) {
ByteBuffer buffer;
for (int i = 0; i < total; i++) {
int length = getLength.applyAsInt(rowId + i);
try {
if (length <= 0) {
throw new IllegalStateException("Invalid length: " + length);
}
buffer = in.slice(length);
} catch (EOFException e) {
throw new ParquetDecodingException("Failed to read " + length + " bytes");
}
outputWriter.write(vec, rowId + i, buffer);
}
}

/** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */
@Override
public boolean readBoolean() {
throw new UnsupportedOperationException("readBoolean is not supported");
}

/** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */
@Override
public byte readByte() {
throw new UnsupportedOperationException("readByte is not supported");
}

/** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */
@Override
public short readShort() {
throw new UnsupportedOperationException("readShort is not supported");
}

/** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */
@Override
public int readInteger() {
throw new UnsupportedOperationException("readInteger is not supported");
}

/** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */
@Override
public long readLong() {
throw new UnsupportedOperationException("readLong is not supported");
}

/** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */
@Override
public float readFloat() {
throw new UnsupportedOperationException("readFloat is not supported");
}

/** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */
@Override
public double readDouble() {
throw new UnsupportedOperationException("readDouble is not supported");
}

/** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */
@Override
public void readIntegers(int total, FieldVector vec, int rowId) {
throw new UnsupportedOperationException("readIntegers is not supported");
}

/** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */
@Override
public void readLongs(int total, FieldVector vec, int rowId) {
throw new UnsupportedOperationException("readLongs is not supported");
}

/** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */
@Override
public void readFloats(int total, FieldVector vec, int rowId) {
throw new UnsupportedOperationException("readFloats is not supported");
}

/** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */
@Override
public void readDoubles(int total, FieldVector vec, int rowId) {
throw new UnsupportedOperationException("readDoubles is not supported");
}

@Override
public void close() throws Exception {
closeables.close();
}

/** A functional interface to write binary values into a FieldVector */
@FunctionalInterface
interface BinaryOutputWriter {

/**
* A functional interface that can be used to write a binary value to a specified row in a
* FieldVector
*
* @param vec a FieldVector to write the value into
* @param index The offset to write to
* @param val value to write
*/
void write(FieldVector vec, long index, ByteBuffer val);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ protected void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, i
case DELTA_BINARY_PACKED:
valuesReader = new VectorizedDeltaEncodedValuesReader();
break;
case DELTA_LENGTH_BYTE_ARRAY:
valuesReader = new VectorizedDeltaLengthByteArrayValuesReader();
break;
default:
throw new UnsupportedOperationException(
"Cannot support vectorized reads for column "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,20 +584,7 @@ protected void nextVal(
VectorizedValuesReader valuesReader,
int typeWidth,
byte[] byteArray) {
int len = valuesReader.readInteger();
ByteBuffer buffer = valuesReader.readBinary(len).toByteBuffer();
// Calling setValueLengthSafe takes care of allocating a larger buffer if
// running out of space.
((BaseVariableWidthVector) vector).setValueLengthSafe(idx, len);
int startOffset = ((BaseVariableWidthVector) vector).getStartOffset(idx);
// It is possible that the data buffer was reallocated. So it is important to
// not cache the data buffer reference but instead use vector.getDataBuffer().
vector.getDataBuffer().setBytes(startOffset, buffer);
// Similarly, we need to get the latest reference to the validity buffer as well
// since reallocation changes reference of the validity buffers as well.
if (setArrowValidityVector) {
BitVectorHelper.setBit(vector.getValidityBuffer(), idx);
}
valuesReader.readBinary(1, vector, idx, setArrowValidityVector);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.iceberg.arrow.vectorized.parquet;

import java.nio.ByteBuffer;
import org.apache.arrow.vector.BaseVariableWidthVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.FieldVector;
import org.apache.iceberg.parquet.ValuesAsBytesReader;
import org.apache.parquet.io.api.Binary;
Expand Down Expand Up @@ -74,4 +76,22 @@ public void readFloats(int total, FieldVector vec, int rowId) {
public void readDoubles(int total, FieldVector vec, int rowId) {
readValues(total, vec, rowId, DOUBLE_SIZE);
}

@Override
public void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector) {
int len = readInteger();
ByteBuffer buffer = readBinary(len).toByteBuffer();
// Calling setValueLengthSafe takes care of allocating a larger buffer if
// running out of space.
((BaseVariableWidthVector) vec).setValueLengthSafe(rowId, len);
int startOffset = ((BaseVariableWidthVector) vec).getStartOffset(rowId);
// It is possible that the data buffer was reallocated. So it is important to
// not cache the data buffer reference but instead use vector.getDataBuffer().
vec.getDataBuffer().setBytes(startOffset, buffer);
// Similarly, we need to get the latest reference to the validity buffer as well
// since reallocation changes reference of the validity buffers as well.
if (setArrowValidityVector) {
BitVectorHelper.setBit(vec.getValidityBuffer(), rowId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ interface VectorizedValuesReader {
double readDouble();

/**
* Read binary data of some length
* Read a single binary value of some length
*
* @param len The number of bytes to read
*/
Expand All @@ -76,6 +76,9 @@ interface VectorizedValuesReader {
/** Read `total` doubles into `vec` starting at `vec[rowId]` */
void readDoubles(int total, FieldVector vec, int rowId);

/** Read `total` binary values into `vec` starting at `vec[rowId]` */
void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector);

/**
* Initialize the reader from a page. See {@link ValuesReader#initFromPage(int,
* ByteBufferInputStream)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public class TestParquetVectorizedReads extends AvroDataTestBase {

private static final String PLAIN = "PLAIN";
private static final List<String> GOLDEN_FILE_ENCODINGS =
ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED");
ImmutableList.of(
"PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED", "DELTA_LENGTH_BYTE_ARRAY");
private static final Map<String, PrimitiveType> GOLDEN_FILE_TYPES =
ImmutableMap.of(
"string", Types.StringType.get(),
Expand Down
Binary file not shown.
Loading