Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,21 @@ public static List<DataField> fieldsInBlobFile(RowType rowType, Set<String> desc
rowType.getFields()
.forEach(
field -> {
DataTypeRoot type = field.type().getTypeRoot();
if (type == DataTypeRoot.BLOB
if (isBlobFileField(field.type())
&& !descriptorFields.contains(field.name())) {
result.add(field);
}
});
return result;
}

public static boolean isBlobFileField(DataType type) {
if (type.getTypeRoot() == DataTypeRoot.BLOB) {
return true;
}
if (type.getTypeRoot() == DataTypeRoot.ARRAY) {
return ((ArrayType) type).getElementType().getTypeRoot() == DataTypeRoot.BLOB;
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.data;

import org.apache.paimon.data.variant.Variant;

import java.io.Serializable;

/**
* Placeholder for an array blob field in data-evolution blob files. It should never be exposed to
* users.
*/
public final class BlobArrayPlaceholder implements InternalArray, Serializable {

private static final long serialVersionUID = 1L;

public static final BlobArrayPlaceholder INSTANCE = new BlobArrayPlaceholder();

private BlobArrayPlaceholder() {}

private Object readResolve() {
return INSTANCE;
}

private static UnsupportedOperationException unsupported() {
return new UnsupportedOperationException(
"Should never call this method for placeholder blob array.");
}

@Override
public int size() {
throw unsupported();
}

@Override
public boolean isNullAt(int pos) {
throw unsupported();
}

@Override
public boolean getBoolean(int pos) {
throw unsupported();
}

@Override
public byte getByte(int pos) {
throw unsupported();
}

@Override
public short getShort(int pos) {
throw unsupported();
}

@Override
public int getInt(int pos) {
throw unsupported();
}

@Override
public long getLong(int pos) {
throw unsupported();
}

@Override
public float getFloat(int pos) {
throw unsupported();
}

@Override
public double getDouble(int pos) {
throw unsupported();
}

@Override
public BinaryString getString(int pos) {
throw unsupported();
}

@Override
public Decimal getDecimal(int pos, int precision, int scale) {
throw unsupported();
}

@Override
public Timestamp getTimestamp(int pos, int precision) {
throw unsupported();
}

@Override
public byte[] getBinary(int pos) {
throw unsupported();
}

@Override
public Variant getVariant(int pos) {
throw unsupported();
}

@Override
public Blob getBlob(int pos) {
throw unsupported();
}

@Override
public InternalArray getArray(int pos) {
throw unsupported();
}

@Override
public InternalVector getVector(int pos) {
throw unsupported();
}

@Override
public InternalMap getMap(int pos) {
throw unsupported();
}

@Override
public InternalRow getRow(int pos, int numFields) {
throw unsupported();
}

@Override
public boolean[] toBooleanArray() {
throw unsupported();
}

@Override
public byte[] toByteArray() {
throw unsupported();
}

@Override
public short[] toShortArray() {
throw unsupported();
}

@Override
public int[] toIntArray() {
throw unsupported();
}

@Override
public long[] toLongArray() {
throw unsupported();
}

@Override
public float[] toFloatArray() {
throw unsupported();
}

@Override
public double[] toDoubleArray() {
throw unsupported();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ static Class<?> getDataClass(DataType type) {
return InternalMap.class;
case ROW:
return InternalRow.class;
case BLOB:
return Blob.class;
default:
throw new IllegalArgumentException("Illegal type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import static java.util.Comparator.comparingLong;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.manifest.ManifestFileMeta.allContainsRowId;
import static org.apache.paimon.types.DataTypeRoot.BLOB;
import static org.apache.paimon.types.BlobType.isBlobFileField;
import static org.apache.paimon.types.VectorType.isVectorStoreFile;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down Expand Up @@ -100,7 +100,7 @@ public DataEvolutionCompactCoordinator(
? table.rowType().getFields().stream()
.filter(
field ->
field.type().is(BLOB)
isBlobFileField(field.type())
&& !blobInlineFields.contains(
field.name()))
.map(DataField::id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.paimon.operation;

import org.apache.paimon.append.ForceSingleBatchReader;
import org.apache.paimon.data.BlobArrayPlaceholder;
import org.apache.paimon.data.BlobPlaceholder;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Range;
Expand Down Expand Up @@ -55,7 +57,8 @@
public class BlobFallbackRecordReader implements RecordReader<InternalRow> {

private final List<RecordReader<InternalRow>> groupReaders = new ArrayList<>();
private final int blobIndex;
private final Object blobPlaceholder;
private final InternalRow.FieldGetter blobGetter;
private boolean returned;

BlobFallbackRecordReader(
Expand All @@ -64,7 +67,9 @@ public class BlobFallbackRecordReader implements RecordReader<InternalRow> {
List<Range> rowRanges,
RowType readRowType,
int blobIndex) {
this.blobIndex = blobIndex;
this.blobPlaceholder = blobPlaceholder(readRowType, blobIndex);
this.blobGetter =
InternalRow.createFieldGetter(readRowType.getTypeAt(blobIndex), blobIndex);

checkArgument(!files.isEmpty(), "Blob bunch should not be empty.");
long firstRowId = Long.MAX_VALUE;
Expand Down Expand Up @@ -188,7 +193,13 @@ public void releaseBatch() {
}

private boolean isPlaceHolder(InternalRow row) {
return !row.isNullAt(blobIndex) && row.getBlob(blobIndex) == BlobPlaceholder.INSTANCE;
return blobGetter.getFieldOrNull(row) == blobPlaceholder;
}

private static Object blobPlaceholder(RowType rowType, int blobIndex) {
return rowType.getTypeAt(blobIndex).getTypeRoot() == DataTypeRoot.ARRAY
? BlobArrayPlaceholder.INSTANCE
: BlobPlaceholder.INSTANCE;
}

@Override
Expand Down Expand Up @@ -263,6 +274,7 @@ public static class BlobSequenceGroupRecordReader implements RecordReader<Intern
private final List<Range> rowRanges;
private final RowType readRowType;
private final int blobIndex;
private final Object blobPlaceholder;
private final long lastRowId;

private RecordReader<InternalRow> currentReader;
Expand All @@ -287,6 +299,7 @@ public static class BlobSequenceGroupRecordReader implements RecordReader<Intern
this.rowRanges = rowRanges == null ? null : Range.sortAndMergeOverlap(rowRanges);
this.readRowType = readRowType;
this.blobIndex = blobIndex;
this.blobPlaceholder = blobPlaceholder(readRowType, blobIndex);
this.lastRowId = lastRowId;

this.nextFileIndex = 0;
Expand Down Expand Up @@ -391,7 +404,7 @@ public void releaseBatch() {
private InternalRow placeHolderRow() {
if (placeholderRow == null) {
GenericRow row = new GenericRow(readRowType.getFieldCount());
row.setField(blobIndex, BlobPlaceholder.INSTANCE);
row.setField(blobIndex, blobPlaceholder);
placeholderRow = row;
}
return placeholderRow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,14 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.util.Set;

import static org.apache.paimon.types.DataTypeRoot.BLOB;

/** Context for blob file. */
public class BlobFileContext {

Expand All @@ -53,7 +51,7 @@ private BlobFileContext(

@Nullable
public static BlobFileContext create(RowType rowType, CoreOptions options) {
if (rowType.getFieldTypes().stream().noneMatch(t -> t.is(BLOB))) {
if (rowType.getFieldTypes().stream().noneMatch(BlobType::isBlobFileField)) {
return null;
}
Set<String> descriptorFields = options.blobDescriptorField();
Expand All @@ -62,8 +60,7 @@ public static BlobFileContext create(RowType rowType, CoreOptions options) {
String externalStoragePath = options.blobExternalStoragePath();
boolean requireBlobFile = false;
for (DataField field : rowType.getFields()) {
DataTypeRoot type = field.type().getTypeRoot();
if (type == DataTypeRoot.BLOB
if (BlobType.isBlobFileField(field.type())
&& (!inlineFields.contains(field.name())
|| externalStorageField.contains(field.name()))) {
requireBlobFile = true;
Expand All @@ -83,7 +80,7 @@ public BlobFileContext withBlobConsumer(BlobConsumer blobConsumer) {
}

public BlobFileContext withWriteType(RowType writeType) {
if (writeType.getFieldTypes().stream().noneMatch(t -> t.is(BLOB))) {
if (writeType.getFieldTypes().stream().noneMatch(BlobType::isBlobFileField)) {
return null;
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FormatReaderMapping;
Expand Down Expand Up @@ -76,6 +75,7 @@
import static java.util.Comparator.comparingLong;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
import static org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking;
import static org.apache.paimon.types.BlobType.isBlobFileField;
import static org.apache.paimon.types.VectorType.isVectorStoreFile;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
Expand Down Expand Up @@ -419,7 +419,7 @@ private RecordReader<InternalRow> sequentialReadFiles(

private static int findBlobFieldIndex(RowType rowType) {
for (int i = 0; i < rowType.getFieldCount(); i++) {
if (rowType.getTypeAt(i).getTypeRoot() == DataTypeRoot.BLOB) {
if (isBlobFileField(rowType.getTypeAt(i))) {
return i;
}
}
Expand Down
Loading
Loading