Skip to content

Commit c3d50e1

Browse files
authored
Arrow, Parquet: Add support for DELTA_BINARY_PACKED Parquet encoding (#13391)
1 parent a519cb2 commit c3d50e1

7 files changed

Lines changed: 357 additions & 42 deletions

File tree

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ This product includes code from Apache Spark.
288288
* casting logic in AssignmentAlignmentSupport
289289
* implementation of SetAccumulator.
290290
* Connector expressions.
291+
* implementation of VectorizedDeltaEncodedValuesReader
291292

292293
Copyright: 2011-2018 The Apache Software Foundation
293294
Home page: https://spark.apache.org/
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.arrow.vectorized.parquet;
20+
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.util.Arrays;
24+
import org.apache.arrow.vector.FieldVector;
25+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
26+
import org.apache.parquet.bytes.ByteBufferInputStream;
27+
import org.apache.parquet.bytes.BytesUtils;
28+
import org.apache.parquet.column.values.ValuesReader;
29+
import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
30+
import org.apache.parquet.column.values.bitpacking.Packer;
31+
import org.apache.parquet.io.ParquetDecodingException;
32+
import org.apache.parquet.io.api.Binary;
33+
34+
/**
35+
* A {@link VectorizedValuesReader} implementation for the encoding type DELTA_BINARY_PACKED. This
36+
* is adapted from Spark's VectorizedDeltaBinaryPackedReader.
37+
*
38+
* @see <a
39+
* href="https://github.com/apache/parquet-format/blob/master/Encodings.md#delta-encoding-delta_binary_packed--5">
40+
* Parquet format encodings: DELTA_BINARY_PACKED</a>
41+
*/
42+
public class VectorizedDeltaEncodedValuesReader extends ValuesReader
43+
implements VectorizedValuesReader {
44+
45+
// header data
46+
private int blockSizeInValues;
47+
private int miniBlocksPerBlock;
48+
private int totalValueCount;
49+
private long firstValue;
50+
51+
private int miniBlockSizeInValues;
52+
53+
// values read by the caller
54+
private int valuesRead = 0;
55+
56+
// variables to keep state of the current block and miniblock
57+
private long lastValueRead; // needed to compute the next value
58+
private long minDeltaInCurrentBlock; // needed to compute the next value
59+
// currentMiniBlock keeps track of the mini block within the current block that
60+
// we read and decoded most recently. Only used as an index into
61+
// bitWidths array
62+
private int currentMiniBlock = 0;
63+
private int[] bitWidths; // bit widths for each miniBlock in the current block
64+
private int remainingInBlock = 0; // values in current block still to be read
65+
private int remainingInMiniBlock = 0; // values in current mini block still to be read
66+
private long[] unpackedValuesBuffer;
67+
68+
private ByteBufferInputStream inputStream;
69+
70+
// temporary buffers used by readInteger and readLong
71+
private int intVal;
72+
private long longVal;
73+
74+
@Override
75+
public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
76+
Preconditions.checkArgument(
77+
valueCount >= 1, "Page must have at least one value, but it has " + valueCount);
78+
this.inputStream = in;
79+
// Read the header
80+
this.blockSizeInValues = BytesUtils.readUnsignedVarInt(this.inputStream);
81+
this.miniBlocksPerBlock = BytesUtils.readUnsignedVarInt(this.inputStream);
82+
double miniSize = (double) blockSizeInValues / miniBlocksPerBlock;
83+
Preconditions.checkArgument(
84+
miniSize % 8 == 0, "miniBlockSize must be multiple of 8, but it's " + miniSize);
85+
this.miniBlockSizeInValues = (int) miniSize;
86+
// True value count. May be less than valueCount because of nulls
87+
this.totalValueCount = BytesUtils.readUnsignedVarInt(this.inputStream);
88+
this.bitWidths = new int[miniBlocksPerBlock];
89+
this.unpackedValuesBuffer = new long[miniBlockSizeInValues];
90+
// read the first value
91+
firstValue = BytesUtils.readZigZagVarLong(this.inputStream);
92+
}
93+
94+
/** DELTA_BINARY_PACKED only supports INT32 and INT64 */
95+
@Override
96+
public byte readByte() {
97+
throw new UnsupportedOperationException("readByte is not supported");
98+
}
99+
100+
/** DELTA_BINARY_PACKED only supports INT32 and INT64 */
101+
@Override
102+
public short readShort() {
103+
throw new UnsupportedOperationException("readShort is not supported");
104+
}
105+
106+
@Override
107+
public int readInteger() {
108+
readValues(1, null, 0, INT_SIZE, (f, i, v) -> intVal = (int) v);
109+
return intVal;
110+
}
111+
112+
@Override
113+
public long readLong() {
114+
readValues(1, null, 0, LONG_SIZE, (f, i, v) -> longVal = v);
115+
return longVal;
116+
}
117+
118+
/** The Iceberg reader currently does not do skipping */
119+
@Override
120+
public void skip() {
121+
throw new UnsupportedOperationException("skip is not supported");
122+
}
123+
124+
/** DELTA_BINARY_PACKED only supports INT32 and INT64 */
125+
@Override
126+
public Binary readBinary(int len) {
127+
throw new UnsupportedOperationException("readBinary is not supported");
128+
}
129+
130+
@Override
131+
public void readIntegers(int total, FieldVector vec, int rowId) {
132+
readValues(total, vec, rowId, INT_SIZE, (f, i, v) -> f.getDataBuffer().setInt(i, (int) v));
133+
}
134+
135+
@Override
136+
public void readLongs(int total, FieldVector vec, int rowId) {
137+
readValues(total, vec, rowId, LONG_SIZE, (f, i, v) -> f.getDataBuffer().setLong(i, v));
138+
}
139+
140+
/** DELTA_BINARY_PACKED only supports INT32 and INT64 */
141+
@Override
142+
public void readFloats(int total, FieldVector vec, int rowId) {
143+
throw new UnsupportedOperationException("readFloats is not supported");
144+
}
145+
146+
/** DELTA_BINARY_PACKED only supports INT32 and INT64 */
147+
@Override
148+
public void readDoubles(int total, FieldVector vec, int rowId) {
149+
throw new UnsupportedOperationException("readDoubles is not supported");
150+
}
151+
152+
private void readValues(
153+
int total, FieldVector vec, int rowId, int typeWidth, IntegerOutputWriter outputWriter) {
154+
if (valuesRead + total > totalValueCount) {
155+
throw new ParquetDecodingException(
156+
"No more values to read. Total values read: "
157+
+ valuesRead
158+
+ ", total count: "
159+
+ totalValueCount
160+
+ ", trying to read "
161+
+ total
162+
+ " more.");
163+
}
164+
165+
int remaining = total;
166+
int currentRowId = rowId;
167+
// First value
168+
if (valuesRead == 0) {
169+
outputWriter.write(vec, ((long) (currentRowId + valuesRead) * typeWidth), firstValue);
170+
lastValueRead = firstValue;
171+
currentRowId++;
172+
remaining--;
173+
}
174+
175+
while (remaining > 0) {
176+
int loadedRows;
177+
try {
178+
loadedRows = loadMiniBlockToOutput(remaining, vec, currentRowId, typeWidth, outputWriter);
179+
} catch (IOException e) {
180+
throw new ParquetDecodingException("Error reading mini block.", e);
181+
}
182+
currentRowId += loadedRows;
183+
remaining -= loadedRows;
184+
}
185+
valuesRead = total - remaining;
186+
}
187+
188+
/**
189+
* Read from a mini block. Read at most 'remaining' values into output.
190+
*
191+
* @return the number of values read into output
192+
*/
193+
private int loadMiniBlockToOutput(
194+
int remaining, FieldVector vec, int rowId, int typeWidth, IntegerOutputWriter outputWriter)
195+
throws IOException {
196+
197+
// new block; read the block header
198+
if (remainingInBlock == 0) {
199+
readBlockHeader();
200+
}
201+
202+
// new miniblock, unpack the miniblock
203+
if (remainingInMiniBlock == 0) {
204+
unpackMiniBlock();
205+
}
206+
207+
// read values from miniblock
208+
int valuesReadInMiniBlock = 0;
209+
for (int i = miniBlockSizeInValues - remainingInMiniBlock;
210+
i < miniBlockSizeInValues && valuesReadInMiniBlock < remaining;
211+
i++) {
212+
// calculate values from deltas unpacked for current block
213+
long outValue = lastValueRead + minDeltaInCurrentBlock + unpackedValuesBuffer[i];
214+
lastValueRead = outValue;
215+
outputWriter.write(vec, ((long) (rowId + valuesReadInMiniBlock) * typeWidth), outValue);
216+
remainingInBlock--;
217+
remainingInMiniBlock--;
218+
valuesReadInMiniBlock++;
219+
}
220+
221+
return valuesReadInMiniBlock;
222+
}
223+
224+
private void readBlockHeader() {
225+
try {
226+
minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(inputStream);
227+
} catch (IOException e) {
228+
throw new ParquetDecodingException("Can not read min delta in current block", e);
229+
}
230+
readBitWidthsForMiniBlocks();
231+
remainingInBlock = blockSizeInValues;
232+
currentMiniBlock = 0;
233+
remainingInMiniBlock = 0;
234+
}
235+
236+
/**
237+
* mini block has a size of 8*n, unpack 32 value each time
238+
*
239+
* <p>see org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader#unpackMiniBlock
240+
*/
241+
private void unpackMiniBlock() throws IOException {
242+
Arrays.fill(this.unpackedValuesBuffer, 0);
243+
BytePackerForLong packer =
244+
Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidths[currentMiniBlock]);
245+
for (int j = 0; j < miniBlockSizeInValues; j += 8) {
246+
ByteBuffer buffer = inputStream.slice(packer.getBitWidth());
247+
if (buffer.hasArray()) {
248+
packer.unpack8Values(
249+
buffer.array(), buffer.arrayOffset() + buffer.position(), unpackedValuesBuffer, j);
250+
} else {
251+
packer.unpack8Values(buffer, buffer.position(), unpackedValuesBuffer, j);
252+
}
253+
}
254+
remainingInMiniBlock = miniBlockSizeInValues;
255+
currentMiniBlock++;
256+
}
257+
258+
// From org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader
259+
private void readBitWidthsForMiniBlocks() {
260+
for (int i = 0; i < miniBlocksPerBlock; i++) {
261+
try {
262+
bitWidths[i] = BytesUtils.readIntLittleEndianOnOneByte(inputStream);
263+
} catch (IOException e) {
264+
throw new ParquetDecodingException("Can not decode bitwidth in block header", e);
265+
}
266+
}
267+
}
268+
269+
/** A functional interface to write long values to into a FieldVector */
270+
@FunctionalInterface
271+
interface IntegerOutputWriter {
272+
273+
/**
274+
* A functional interface that can be used to write a long value to a specified row in a
275+
* FieldVector
276+
*
277+
* @param vec a FieldVector to write the value into
278+
* @param index The offset to write to
279+
* @param val value to write
280+
*/
281+
void write(FieldVector vec, long index, long val);
282+
}
283+
}

arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,21 @@ protected void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, i
9393
throw new ParquetDecodingException("could not read page in col " + desc, e);
9494
}
9595
} else {
96-
if (dataEncoding == Encoding.PLAIN) {
97-
valuesReader = new VectorizedPlainValuesReader();
98-
} else {
99-
throw new UnsupportedOperationException(
100-
"Cannot support vectorized reads for column "
101-
+ desc
102-
+ " with "
103-
+ "encoding "
104-
+ dataEncoding
105-
+ ". Disable vectorized reads to read this table/file");
96+
switch (dataEncoding) {
97+
case PLAIN:
98+
valuesReader = new VectorizedPlainValuesReader();
99+
break;
100+
case DELTA_BINARY_PACKED:
101+
valuesReader = new VectorizedDeltaEncodedValuesReader();
102+
break;
103+
default:
104+
throw new UnsupportedOperationException(
105+
"Cannot support vectorized reads for column "
106+
+ desc
107+
+ " with "
108+
+ "encoding "
109+
+ dataEncoding
110+
+ ". Disable vectorized reads to read this table/file");
106111
}
107112
try {
108113
valuesReader.initFromPage(valueCount, in);

0 commit comments

Comments
 (0)