Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ public InlineElement getDescription() {
public static final String FILE_FORMAT_TEXT = "text";
public static final String FILE_FORMAT_JSON = "json";
public static final String FILE_FORMAT_MOSAIC = "mosaic";
public static final String FILE_FORMAT_BLOB = "blob";

public static final ConfigOption<String> FILE_FORMAT =
key("file.format")
Expand Down Expand Up @@ -2908,6 +2909,7 @@ public String formatTableFileCompression() {
case FILE_FORMAT_CSV:
case FILE_FORMAT_TEXT:
case FILE_FORMAT_JSON:
case FILE_FORMAT_BLOB:
return "none";
default:
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
Expand All @@ -61,8 +62,11 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.AUTO_CREATE;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.FILE_FORMAT_BLOB;
import static org.apache.paimon.CoreOptions.FORMAT_TABLE_IMPLEMENTATION;
import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGACY_NAME;
Expand Down Expand Up @@ -169,12 +173,30 @@ public static void validateCreateTable(Schema schema, boolean dataTokenEnabled)
FORMAT_TABLE_IMPLEMENTATION.key(),
PATH.key());
}
validateBlobFormatTable(schema, options);
}
for (DataField field : schema.fields()) {
validateDefaultValue(field.type(), field.defaultValue());
}
}

private static void validateBlobFormatTable(Schema schema, Options options) {
if (!FILE_FORMAT_BLOB.equalsIgnoreCase(options.get(FILE_FORMAT))) {
return;
}

List<DataField> nonPartitionFields =
schema.fields().stream()
.filter(field -> !schema.partitionKeys().contains(field.name()))
.collect(Collectors.toList());
checkArgument(
nonPartitionFields.size() == 1,
"BLOB format table only supports one non-partition field.");
checkArgument(
nonPartitionFields.get(0).type().getTypeRoot() == DataTypeRoot.BLOB,
"BLOB format table only supports BLOB type as non-partition field.");
}

public static void validateNamePattern(Catalog catalog, String namePattern) {
if (Objects.nonNull(namePattern) && !catalog.supportsListByPattern()) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.io;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileAwareFormatWriter;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SupportsDirectWrite;
Expand Down Expand Up @@ -64,6 +65,10 @@ public FormatTableSingleFileWriter(
out = fileIO.newTwoPhaseOutputStream(path, false);
writer = factory.create(out, compression);
}
if (writer instanceof FileAwareFormatWriter) {
FileAwareFormatWriter fileAwareFormatWriter = (FileAwareFormatWriter) writer;
fileAwareFormatWriter.setFile(path);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setFile(path) is not enough for the withBlobConsumer path. BlobFormatWriter invokes the consumer while writing and the emitted descriptor points at this target path, but this writer is backed by a TwoPhaseOutputStream, so the target file is not visible until FormatTableCommit commits it; if a later write/commit fails, abort()/FormatTableCommit.abort() discards it anyway. This violates the TableWrite.withBlobConsumer contract that these files are left for the caller to clean up, and leaves already-emitted descriptors dangling. Please either make the consumer path use visible/non-deleted files like SingleFileWriter does with deleteFileUponAbort(), or defer/avoid emitting descriptors until the file has actually been committed, and add a failure-path test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi Thanks for your reivew! But this scenario is a little bit tricky.
Currently FormatTable on DFS uses RENAME to do two-phase-commit. So the set path is not real, only exists after commit! At that case, if commit failed and aborted, it's meaningless to retain the written files, because they are in temp dir and not equal to path stored in BlobDescriptors.
(However in python, no two-phase commit implemented, so I still retain written files on abortion)

Here're my thinkings:

  1. Maybe we could explicitly warn users that in FormatTable, returned blobDescriptors are only valid after commit? Or maybe introduce a PendingBlobDescriptor for format tables, all same as BlobDescriptors but BlobRef could warn users the Descriptor is still pending, rather than throws path not exists.
  2. I think this "visible after commit" is acceptable for batch scenarios, for example: in Spark/Ray, FormatTable commit is a part of job, exported descriptors will be visible only after the job is succesfully finished.
  3. Or maybe we do not use two-phase commit for BlobFormatTables? Just filter out the broken files on read.

Thanks again for your review! I'll close this PR and find an another way if you think this scenario is not suitable for paimon FormatTable.

}
} catch (IOException e) {
LOG.warn(
"Failed to open the bulk writer, closing the output stream and throw the error.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public interface FormatTable extends Table {
enum Format {
ORC,
PARQUET,
BLOB,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding BLOB here also exposes format-table projection paths. For a table like (payload BLOB, ds INT) PARTITIONED BY (ds), projecting only ds makes FormatReadBuilder remove partition columns before creating the file reader, so the projectedRowType passed to BlobFileFormat is empty. BlobFileFormat currently requires a BLOB field and throws, whereas other format tables can satisfy partition-only projections. Please handle this case, for example by reading only the blob file metadata to get the row count and then appending partition columns, or by adding an explicit supported projection path with a test.

CSV,
TEXT,
JSON,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BlobConsumer;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.blob.BlobFileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.TwoPhaseOutputStream;
Expand Down Expand Up @@ -78,6 +80,14 @@ public void withWriteType(RowType writeType) {
this.writeRowType = writeType;
}

public void withBlobConsumer(BlobConsumer blobConsumer) {
if (!(fileFormat instanceof BlobFileFormat)) {
throw new UnsupportedOperationException(
"BlobConsumer is only supported for BLOB format table.");
}
((BlobFileFormat) fileFormat).setWriteConsumer(blobConsumer);
}

public void write(BinaryRow partition, InternalRow data) throws Exception {
FormatTableRecordWriter writer = writers.get(partition);
if (writer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public TableWrite withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {

@Override
public TableWrite withBlobConsumer(BlobConsumer blobConsumer) {
throw new UnsupportedOperationException();
write.withBlobConsumer(blobConsumer);
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataTypes;

import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link CatalogUtils}. */
public class CatalogUtilsTest {

@Test
public void testValidateBlobFormatTable() {
assertThatNoException()
.isThrownBy(
() ->
CatalogUtils.validateCreateTable(
Schema.newBuilder()
.column("payload", DataTypes.BLOB())
.column("ds", DataTypes.INT())
.partitionKeys("ds")
.options(blobFormatTableOptions())
.build(),
false));

assertThatThrownBy(
() ->
CatalogUtils.validateCreateTable(
Schema.newBuilder()
.column("payload", DataTypes.BLOB())
.column("id", DataTypes.INT())
.column("ds", DataTypes.INT())
.partitionKeys("ds")
.options(blobFormatTableOptions())
.build(),
false))
.hasMessageContaining("only supports one non-partition field");

assertThatThrownBy(
() ->
CatalogUtils.validateCreateTable(
Schema.newBuilder()
.column("payload", DataTypes.BYTES())
.column("ds", DataTypes.INT())
.partitionKeys("ds")
.options(blobFormatTableOptions())
.build(),
false))
.hasMessageContaining("only supports BLOB type as non-partition field");

assertThatThrownBy(
() ->
CatalogUtils.validateCreateTable(
Schema.newBuilder()
.column("payload", DataTypes.BLOB())
.partitionKeys("payload")
.options(blobFormatTableOptions())
.build(),
false))
.hasMessageContaining("only supports one non-partition field");
}

private Map<String, String> blobFormatTableOptions() {
Map<String, String> options = new HashMap<>();
options.put("type", "format-table");
options.put("file.format", "blob");
return options;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.format;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.Blob;
import org.apache.paimon.data.BlobData;
import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.UriReader;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for BLOB {@link FormatTable}. */
public class FormatTableBlobTest {

@TempDir java.nio.file.Path tempPath;

@Test
public void testReadAndWriteBlobDataAndDescriptor() throws Exception {
FormatTable table = createBlobFormatTable();
byte[] first = "hello".getBytes(StandardCharsets.UTF_8);
byte[] second = "world".getBytes(StandardCharsets.UTF_8);
Blob descriptorBlob = createDescriptorBlob(table, second);

BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit()) {
write.write(GenericRow.of(new BlobData(first), 1));
write.write(GenericRow.of(descriptorBlob, 1));
write.write(GenericRow.of(null, 1));
commit.commit(write.prepareCommit());
}

assertReadRows(table, first, second);
}

@Test
public void testBlobConsumerProducesDescriptors() throws Exception {
FormatTable table = createBlobFormatTable();
byte[] bytes = "hello".getBytes(StandardCharsets.UTF_8);
List<BlobDescriptor> descriptors = new ArrayList<>();

BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
try (BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit()) {
write.withBlobConsumer(
(blobFieldName, blobDescriptor) -> {
assertThat(blobFieldName).isEqualTo("payload");
descriptors.add(blobDescriptor);
return true;
});
write.write(GenericRow.of(new BlobData(bytes), 1));
write.write(GenericRow.of(null, 1));
commit.commit(write.prepareCommit());
}

assertThat(descriptors).hasSize(2);
assertDescriptor(table, descriptors.get(0), bytes);
assertThat(descriptors.get(1)).isNull();
}

private FormatTable createBlobFormatTable() throws Exception {
RowType rowType =
RowType.builder()
.field("payload", DataTypes.BLOB())
.field("ds", DataTypes.INT())
.build();
java.nio.file.Path tableDir = tempPath.resolve("table");
Files.createDirectories(tableDir);
Path tablePath = new Path(tableDir.toUri());
Map<String, String> options = new HashMap<>();
options.put("path", tablePath.toString());
options.put("file.format", "blob");

return FormatTable.builder()
.fileIO(LocalFileIO.create())
.identifier(Identifier.create("test_db", "blob_table"))
.rowType(rowType)
.partitionKeys(Collections.singletonList("ds"))
.location(tablePath.toString())
.format(FormatTable.Format.BLOB)
.options(options)
.build();
}

private Blob createDescriptorBlob(FormatTable table, byte[] bytes) throws Exception {
java.nio.file.Path externalFile = tempPath.resolve("external-blob");
Files.write(externalFile, bytes);
BlobDescriptor descriptor =
new BlobDescriptor(new Path(externalFile.toUri()).toString(), 0, bytes.length);
return Blob.fromDescriptor(UriReader.fromFile(table.fileIO()), descriptor);
}

private void assertDescriptor(FormatTable table, BlobDescriptor descriptor, byte[] bytes)
throws Exception {
assertThat(descriptor).isNotNull();
assertThat(descriptor.uri()).isNotEmpty();
assertThat(descriptor.uri()).contains("ds=1");
assertThat(descriptor.offset()).isGreaterThanOrEqualTo(0L);
assertThat(descriptor.length()).isEqualTo(bytes.length);
assertThat(Blob.fromDescriptor(UriReader.fromFile(table.fileIO()), descriptor).toData())
.isEqualTo(bytes);
}

private void assertReadRows(FormatTable table, byte[] first, byte[] second) throws Exception {
ReadBuilder readBuilder = table.newReadBuilder();
List<InternalRow> rows = new ArrayList<>();
try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan())) {
InternalRowSerializer serializer = new InternalRowSerializer(readBuilder.readType());
reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
}

assertThat(rows).hasSize(3);
assertThat(rows.get(0).getBlob(0).toData()).isEqualTo(first);
assertThat(rows.get(0).getInt(1)).isEqualTo(1);
assertThat(rows.get(1).getBlob(0).toData()).isEqualTo(second);
assertThat(rows.get(1).getInt(1)).isEqualTo(1);
assertThat(rows.get(2).isNullAt(0)).isTrue();
assertThat(rows.get(2).getInt(1)).isEqualTo(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public Map<String, String> extractOptions(Table table) {
@Override
public boolean supportCloneSplits(String format) {
for (FormatTable.Format supportFormat : FormatTable.Format.values()) {
if (supportFormat.name().equalsIgnoreCase(format)) {
if (supportFormat != FormatTable.Format.BLOB
&& supportFormat.name().equalsIgnoreCase(format)) {
return true;
}
}
Expand Down
Loading
Loading