diff --git a/docs/generated/core_configuration.html b/docs/generated/core_configuration.html index 45e47fdf2368..6b207576efa7 100644 --- a/docs/generated/core_configuration.html +++ b/docs/generated/core_configuration.html @@ -108,7 +108,7 @@
blob-write-null-on-missing-file
false Boolean - Whether to write NULL for a descriptor BLOB value when the referenced file does not exist during Flink writes. When false, the write fails when the descriptor is read. + Whether to write NULL for a descriptor BLOB value when the referenced file or HTTP resource does not exist during Flink writes. When false, the write fails when the descriptor is read.
blob.split-by-file-size
@@ -903,6 +903,24 @@ Integer Level threshold of lookup to generate remote lookup files. Level files below this threshold will not generate remote lookup files. + +
manifest-sort.enabled
+ false + Boolean + Whether to invoke manifest sort rewrite during commit.
Note: enabling this changes the semantics of 'manifest.merge-min-count'. In the sort rewrite path, small manifest files within the rewrite budget are sorted and merged directly, so the minimum-count gate no longer prevents merging a small number of under-budget manifest files when full compaction is not triggered. + + +
manifest-sort.max-rewrite-size
+ 256 mb + MemorySize + Maximum total size of manifest files to rewrite in a single sort rewrite pass. Sections exceeding this limit are skipped. Set to a larger value to allow more aggressive sort rewriting. The cap only limits the sorted rewrite portion and full/minor cleanup may still happen beyond it. + + +
manifest-sort.partition-field
+ (none) + String + Partition field name to sort manifest entries by. Validated by schema validation, if not configured, defaults to the first partition field. +
manifest.compression
"zstd" @@ -933,24 +951,6 @@ Integer To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge.
Note: when 'manifest-sort.enabled' is true, this minimum-count gate is only applied to the trailing sub-segment of a section that exceeds 'manifest-sort.max-rewrite-size'. Small under-budget sections are sorted and rewritten directly, so two small manifest files may be merged into one even when their count is below this threshold and full compaction is not triggered. - -
manifest-sort.enabled
- false - Boolean - Whether to invoke manifest sort rewrite during commit.
Note: enabling this changes the semantics of 'manifest.merge-min-count'. In the sort rewrite path, small manifest files within the rewrite budget are sorted and merged directly, so the minimum-count gate no longer prevents merging a small number of under-budget manifest files when full compaction is not triggered. - - -
manifest-sort.partition-field
- (none) - String - Partition field name to sort manifest entries by. Validated by schema validation, if not configured, defaults to the first partition field. - - -
manifest-sort.max-rewrite-size
- 256 mb - MemorySize - Maximum total size of manifest files to rewrite in a single sort rewrite pass. Sections exceeding this limit are skipped. Set to a larger value to allow more aggressive sort rewriting. The cap only limits the sorted rewrite portion and full/minor cleanup may still happen beyond it. -
manifest.target-file-size
8 mb diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 4a4b7b5f8123..2cd6c8a51352 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2453,8 +2453,9 @@ public InlineElement getDescription() { .defaultValue(false) .withDescription( "Whether to write NULL for a descriptor BLOB value when the " - + "referenced file does not exist during Flink writes. When " - + "false, the write fails when the descriptor is read."); + + "referenced file or HTTP resource does not exist during Flink " + + "writes. When false, the write fails when the descriptor is " + + "read."); @Immutable public static final ConfigOption BLOB_EXTERNAL_STORAGE_PATH = diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/HttpClientUtils.java b/paimon-api/src/main/java/org/apache/paimon/rest/HttpClientUtils.java index 9447f2417190..6962f21bc543 100644 --- a/paimon-api/src/main/java/org/apache/paimon/rest/HttpClientUtils.java +++ b/paimon-api/src/main/java/org/apache/paimon/rest/HttpClientUtils.java @@ -22,6 +22,7 @@ import org.apache.paimon.rest.interceptor.TimingInterceptor; import org.apache.hc.client5.http.classic.methods.HttpGet; +import org.apache.hc.client5.http.classic.methods.HttpHead; import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; @@ -31,6 +32,7 @@ import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; import org.apache.hc.client5.http.ssl.HttpsSupport; +import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.reactor.ssl.SSLBufferMode; import org.apache.hc.core5.ssl.SSLContexts; import org.apache.hc.core5.util.Timeout; @@ -86,9 +88,65 @@ private static HttpClientConnectionManager configureConnectionManager() { public static InputStream getAsInputStream(String uri) throws IOException { HttpGet httpGet = new HttpGet(uri); CloseableHttpResponse response = DEFAULT_HTTP_CLIENT.execute(httpGet); - if (response.getCode() != 200) { - throw new RuntimeException("HTTP error code: " + response.getCode()); + int statusCode = response.getCode(); + if (statusCode != HttpStatus.SC_OK) { + throw httpError(statusCode); } return response.getEntity().getContent(); } + + /** + * Checks whether an HTTP resource exists. HEAD is attempted first; when HEAD does not return + * 200, a lightweight GET with {@code Range: bytes=0-0} is used to verify readability. This + * avoids treating signed or GET-only URLs as missing when HEAD is rejected or returns a + * different status than GET. + */ + public static boolean exists(String uri) throws IOException { + int headStatusCode = headStatusCode(uri); + if (headStatusCode == HttpStatus.SC_OK) { + return true; + } + int rangeStatusCode = getRangeStatusCode(uri); + if (rangeStatusCode == HttpStatus.SC_OK + || rangeStatusCode == HttpStatus.SC_PARTIAL_CONTENT) { + return true; + } + if (rangeStatusCode == HttpStatus.SC_NOT_FOUND) { + return false; + } + throw new IOException( + "Unexpected HTTP status code: " + rangeStatusCode + " for uri: " + uri); + } + + public static boolean isNotFoundError(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof RuntimeException + && current.getMessage() != null + && current.getMessage().startsWith("HTTP error code: 404")) { + return true; + } + current = current.getCause(); + } + return false; + } + + private static int headStatusCode(String uri) throws IOException { + HttpHead httpHead = new HttpHead(uri); + try (CloseableHttpResponse response = DEFAULT_HTTP_CLIENT.execute(httpHead)) { + return response.getCode(); + } + } + + private static int getRangeStatusCode(String uri) throws IOException { + HttpGet httpGet = new HttpGet(uri); + httpGet.addHeader("Range", "bytes=0-0"); + try (CloseableHttpResponse response = DEFAULT_HTTP_CLIENT.execute(httpGet)) { + return response.getCode(); + } + } + + private static RuntimeException httpError(int statusCode) { + return new RuntimeException("HTTP error code: " + statusCode); + } } diff --git a/paimon-api/src/test/java/org/apache/paimon/rest/HttpClientUtilsTest.java b/paimon-api/src/test/java/org/apache/paimon/rest/HttpClientUtilsTest.java new file mode 100644 index 000000000000..5442d9e1d040 --- /dev/null +++ b/paimon-api/src/test/java/org/apache/paimon/rest/HttpClientUtilsTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link HttpClientUtils}. */ +public class HttpClientUtilsTest { + + private HttpServer server; + private int port; + + @BeforeEach + public void setUp() throws Exception { + server = HttpServer.create(new InetSocketAddress(0), 0); + port = server.getAddress().getPort(); + server.start(); + } + + @AfterEach + public void tearDown() { + if (server != null) { + server.stop(0); + } + } + + @Test + public void testExistsReturnsTrueForAvailableResource() throws Exception { + registerHandler( + "/ok", + exchange -> { + respond(exchange, 200, "abc".getBytes()); + }); + + assertThat(HttpClientUtils.exists(url("/ok"))).isTrue(); + } + + @Test + public void testExistsReturnsFalseForMissingResource() throws Exception { + registerHandler( + "/missing", + exchange -> { + respond(exchange, 404, new byte[0]); + }); + + assertThat(HttpClientUtils.exists(url("/missing"))).isFalse(); + } + + @Test + public void testExistsFallsBackToRangeGetWhenHeadNotAllowed() throws Exception { + registerHandler( + "/no-head", + exchange -> { + if ("HEAD".equals(exchange.getRequestMethod())) { + respond(exchange, 405, new byte[0]); + return; + } + respond(exchange, 200, "abc".getBytes()); + }); + + assertThat(HttpClientUtils.exists(url("/no-head"))).isTrue(); + } + + @Test + public void testExistsFallsBackToRangeGetWhenHeadReturnsNotFound() throws Exception { + registerHandler( + "/head-404-get-ok", + exchange -> { + if ("HEAD".equals(exchange.getRequestMethod())) { + respond(exchange, 404, new byte[0]); + return; + } + if ("GET".equals(exchange.getRequestMethod()) + && exchange.getRequestHeaders().getFirst("Range") != null) { + respond(exchange, 206, "abc".getBytes()); + return; + } + respond(exchange, 404, new byte[0]); + }); + + assertThat(HttpClientUtils.exists(url("/head-404-get-ok"))).isTrue(); + } + + @Test + public void testExistsFallsBackToRangeGetWhenHeadReturnsForbidden() throws Exception { + registerHandler( + "/head-403-get-ok", + exchange -> { + if ("HEAD".equals(exchange.getRequestMethod())) { + respond(exchange, 403, new byte[0]); + return; + } + if ("GET".equals(exchange.getRequestMethod()) + && exchange.getRequestHeaders().getFirst("Range") != null) { + respond(exchange, 200, "abc".getBytes()); + return; + } + respond(exchange, 403, new byte[0]); + }); + + assertThat(HttpClientUtils.exists(url("/head-403-get-ok"))).isTrue(); + } + + @Test + public void testExistsReturnsFalseOnlyWhenRangeGetAlsoNotFound() throws Exception { + registerHandler( + "/head-404-get-404", + exchange -> { + if ("HEAD".equals(exchange.getRequestMethod())) { + respond(exchange, 404, new byte[0]); + return; + } + respond(exchange, 404, new byte[0]); + }); + + assertThat(HttpClientUtils.exists(url("/head-404-get-404"))).isFalse(); + } + + @Test + public void testGetAsInputStreamThrowsForNotFound() { + registerHandler( + "/get-missing", + exchange -> { + respond(exchange, 404, new byte[0]); + }); + + assertThatThrownBy(() -> HttpClientUtils.getAsInputStream(url("/get-missing"))) + .isInstanceOf(RuntimeException.class) + .hasMessage("HTTP error code: 404"); + } + + @Test + public void testIsNotFoundError() { + RuntimeException exception = + new RuntimeException("wrapper", new RuntimeException("HTTP error code: 404")); + assertThat(HttpClientUtils.isNotFoundError(exception)).isTrue(); + assertThat(HttpClientUtils.isNotFoundError(new RuntimeException("HTTP error code: 500"))) + .isFalse(); + } + + private void registerHandler(String path, HttpHandler handler) { + server.createContext(path, handler); + } + + private String url(String path) { + return "http://127.0.0.1:" + port + path; + } + + private static void respond(HttpExchange exchange, int statusCode, byte[] body) + throws IOException { + boolean headRequest = "HEAD".equals(exchange.getRequestMethod()); + long responseLength = headRequest ? -1 : body.length; + exchange.sendResponseHeaders(statusCode, responseLength); + if (!headRequest && body.length > 0) { + try (OutputStream outputStream = exchange.getResponseBody()) { + outputStream.write(body); + } + } else { + exchange.close(); + } + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java b/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java index 6c0b97a25cfe..236bd6b6f697 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/UriReader.java @@ -64,5 +64,9 @@ class HttpUriReader implements UriReader { public SeekableInputStream newInputStream(String uri) throws IOException { return SeekableInputStream.wrap(HttpClientUtils.getAsInputStream(uri)); } + + public boolean exists(String uri) throws IOException { + return HttpClientUtils.exists(uri); + } } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java index 83c16351e0d2..a2fd2d4ad356 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java @@ -51,8 +51,13 @@ public UriReader create(String input) { public boolean exists(String input) throws IOException { UriReader reader = create(input); - return !(reader instanceof UriReader.FileUriReader) - || ((UriReader.FileUriReader) reader).exists(input); + if (reader instanceof UriReader.FileUriReader) { + return ((UriReader.FileUriReader) reader).exists(input); + } + if (reader instanceof UriReader.HttpUriReader) { + return ((UriReader.HttpUriReader) reader).exists(input); + } + return true; } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { diff --git a/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java b/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java index 290b7f976e15..3cf1cbaf3e5d 100644 --- a/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/utils/UriReaderFactoryTest.java @@ -23,9 +23,16 @@ import org.apache.paimon.utils.UriReader.FileUriReader; import org.apache.paimon.utils.UriReader.HttpUriReader; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; import java.nio.file.Files; import static org.assertj.core.api.Assertions.assertThat; @@ -38,6 +45,23 @@ public class UriReaderFactoryTest { @TempDir java.nio.file.Path tempPath; + private HttpServer httpServer; + private int httpPort; + + @BeforeEach + public void setUpHttpServer() throws Exception { + httpServer = HttpServer.create(new InetSocketAddress(0), 0); + httpPort = httpServer.getAddress().getPort(); + httpServer.start(); + } + + @AfterEach + public void tearDownHttpServer() { + if (httpServer != null) { + httpServer.stop(0); + } + } + @Test public void testCreateHttpUriReader() { UriReader reader = factory.create("http://example.com/file.txt"); @@ -93,8 +117,25 @@ public void testExistsUsesCachedFileUriReader() throws Exception { } @Test - public void testExistsSkipsHttpUriReader() throws Exception { - assertThat(factory.exists("https://example.com/missing.txt")).isTrue(); + public void testExistsReturnsFalseForMissingHttpResource() throws Exception { + registerHttpHandler( + "/missing.txt", + exchange -> { + sendResponse(exchange, 404, new byte[0]); + }); + + assertThat(factory.exists(httpUrl("/missing.txt"))).isFalse(); + } + + @Test + public void testExistsReturnsTrueForAvailableHttpResource() throws Exception { + registerHttpHandler( + "/ok.txt", + exchange -> { + sendResponse(exchange, 200, "ok".getBytes()); + }); + + assertThat(factory.exists(httpUrl("/ok.txt"))).isTrue(); } @Test @@ -104,4 +145,26 @@ public void testReadersReinitializedAfterDeserialization() throws Exception { UriReader reader2 = deserializedFactory.create("http://my_bucket/path/to/file2.txt"); assertThat(reader1).isSameAs(reader2); } + + private void registerHttpHandler(String path, com.sun.net.httpserver.HttpHandler handler) { + httpServer.createContext(path, handler); + } + + private String httpUrl(String path) { + return "http://127.0.0.1:" + httpPort + path; + } + + private static void sendResponse(HttpExchange exchange, int statusCode, byte[] body) + throws IOException { + boolean headRequest = "HEAD".equals(exchange.getRequestMethod()); + long responseLength = headRequest ? -1 : body.length; + exchange.sendResponseHeaders(statusCode, responseLength); + if (!headRequest && body.length > 0) { + try (OutputStream outputStream = exchange.getResponseBody()) { + outputStream.write(body); + } + } else { + exchange.close(); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java index 3be374ff9dca..8fc8f0c03f86 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/DedicatedFormatRollingFileWriter.java @@ -202,7 +202,8 @@ public DedicatedFormatRollingFileWriter( statsDenseStore, blobTargetFileSize, context.blobConsumer(), - context.blobInlineFields()); + context.blobInlineFields(), + context.writeNullOnMissingFile()); } else { this.blobWriterFactory = null; } @@ -220,7 +221,8 @@ public DedicatedFormatRollingFileWriter( fileSource, asyncFileWrite, statsDenseStore, - blobTargetFileSize); + blobTargetFileSize, + context.writeNullOnMissingFile()); } else { this.externalStorageBlobWriter = null; } diff --git a/paimon-core/src/main/java/org/apache/paimon/append/ExternalStorageBlobWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/ExternalStorageBlobWriter.java index 843506dd6dbb..37fb8f01e46c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/ExternalStorageBlobWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/ExternalStorageBlobWriter.java @@ -86,7 +86,8 @@ public ExternalStorageBlobWriter( FileSource fileSource, boolean asyncFileWrite, boolean statsDenseStore, - long targetFileSize) { + long targetFileSize, + boolean writeNullOnMissingFile) { checkNotNull( externalStoragePath, "'%s' must be set when '%s' is configured.", @@ -105,7 +106,8 @@ public ExternalStorageBlobWriter( fileSource, asyncFileWrite, statsDenseStore, - targetFileSize); + targetFileSize, + writeNullOnMissingFile); this.uriReader = UriReader.fromFile(fileIO); int fieldCount = writeSchema.getFieldCount(); @@ -167,7 +169,8 @@ private static List createFieldWriters( FileSource fileSource, boolean asyncFileWrite, boolean statsDenseStore, - long targetFileSize) { + long targetFileSize, + boolean writeNullOnMissingFile) { List writers = new ArrayList<>(); for (DataField field : writeSchema.getFields()) { if (field.type().getTypeRoot() == DataTypeRoot.BLOB @@ -184,7 +187,8 @@ private static List createFieldWriters( fileSource, asyncFileWrite, statsDenseStore, - targetFileSize)); + targetFileSize, + writeNullOnMissingFile)); } } return writers; @@ -201,12 +205,14 @@ private static ExternalStorageBlobFieldWriter createFieldWriter( FileSource fileSource, boolean asyncFileWrite, boolean statsDenseStore, - long targetFileSize) { + long targetFileSize, + boolean writeNullOnMissingFile) { int fieldIndex = writeSchema.getFieldIndex(fieldName); ExternalStorageBlobFieldWriter fieldWriter = new ExternalStorageBlobFieldWriter(fieldIndex); BlobFileFormat blobFileFormat = new BlobFileFormat(); blobFileFormat.setWriteConsumer(fieldWriter); + blobFileFormat.setWriteNullOnMissingFile(writeNullOnMissingFile); RowType projectedType = writeSchema.project(fieldName); fieldWriter.setWriter( diff --git a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java index 045af6ff7565..7c6a513dbe0e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java @@ -63,12 +63,14 @@ public MultipleBlobFileWriter( boolean statsDenseStore, long targetFileSize, @Nullable BlobConsumer blobConsumer, - Set blobInlineFields) { + Set blobInlineFields, + boolean writeNullOnMissingFile) { RowType blobRowType = new RowType(fieldsInBlobFile(writeSchema, blobInlineFields)); this.blobWriters = new ArrayList<>(); for (String blobFieldName : blobRowType.getFieldNames()) { BlobFileFormat blobFileFormat = new BlobFileFormat(); blobFileFormat.setWriteConsumer(blobConsumer); + blobFileFormat.setWriteNullOnMissingFile(writeNullOnMissingFile); blobWriters.add( new BlobProjectedFileWriter( () -> diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java index 355e4781eb5a..17633e1b0ff2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BlobFileContext.java @@ -37,6 +37,7 @@ public class BlobFileContext { private final Set blobInlineFields; private final Set blobExternalStorageFields; @Nullable private final String blobExternalStoragePath; + private final boolean writeNullOnMissingFile; private @Nullable BlobConsumer blobConsumer; @@ -44,11 +45,13 @@ private BlobFileContext( Set blobDescriptorFields, Set blobInlineFields, Set blobExternalStorageFields, - @Nullable String blobExternalStoragePath) { + @Nullable String blobExternalStoragePath, + boolean writeNullOnMissingFile) { this.blobDescriptorFields = blobDescriptorFields; this.blobInlineFields = blobInlineFields; this.blobExternalStorageFields = blobExternalStorageFields; this.blobExternalStoragePath = blobExternalStoragePath; + this.writeNullOnMissingFile = writeNullOnMissingFile; } @Nullable @@ -74,7 +77,11 @@ public static BlobFileContext create(RowType rowType, CoreOptions options) { return null; } return new BlobFileContext( - descriptorFields, inlineFields, externalStorageField, externalStoragePath); + descriptorFields, + inlineFields, + externalStorageField, + externalStoragePath, + options.blobWriteNullOnMissingFile()); } public BlobFileContext withBlobConsumer(BlobConsumer blobConsumer) { @@ -110,4 +117,8 @@ public String blobExternalStoragePath() { public BlobConsumer blobConsumer() { return blobConsumer; } + + public boolean writeNullOnMissingFile() { + return writeNullOnMissingFile; + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperTest.java index 1ca0ed0422e5..464dd4b8b091 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkRowWrapperTest.java @@ -23,10 +23,17 @@ import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.options.Options; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; import org.apache.flink.table.data.GenericRowData; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; import java.nio.file.Files; import static org.assertj.core.api.Assertions.assertThat; @@ -36,6 +43,23 @@ public class FlinkRowWrapperTest { @TempDir java.nio.file.Path tempPath; + private HttpServer httpServer; + private int httpPort; + + @BeforeEach + public void setUpHttpServer() throws Exception { + httpServer = HttpServer.create(new InetSocketAddress(0), 0); + httpPort = httpServer.getAddress().getPort(); + httpServer.start(); + } + + @AfterEach + public void tearDownHttpServer() { + if (httpServer != null) { + httpServer.stop(0); + } + } + @Test public void testMissingBlobDescriptorIsNullWhenCheckingEnabled() { java.nio.file.Path missing = tempPath.resolve("missing.blob"); @@ -59,6 +83,36 @@ public void testExistingBlobDescriptorIsReadableWhenCheckingEnabled() throws Exc assertThat(wrapper.getBlob(0).toData()).isEqualTo(bytes); } + @Test + public void testMissingHttpBlobDescriptorIsNullWhenCheckingEnabled() throws Exception { + httpServer.createContext( + "/missing.jpg", + exchange -> { + sendResponse(exchange, 404, new byte[0]); + }); + GenericRowData row = descriptorRow("http://127.0.0.1:" + httpPort + "/missing.jpg", 1); + + FlinkRowWrapper wrapper = wrapper(row, true); + + assertThat(wrapper.isNullAt(0)).isTrue(); + } + + @Test + public void testExistingHttpBlobDescriptorIsReadableWhenCheckingEnabled() throws Exception { + byte[] bytes = new byte[] {1, 2, 3}; + httpServer.createContext( + "/ok.jpg", + exchange -> { + sendResponse(exchange, 200, bytes); + }); + GenericRowData row = + descriptorRow("http://127.0.0.1:" + httpPort + "/ok.jpg", bytes.length); + + FlinkRowWrapper wrapper = wrapper(row, true); + + assertThat(wrapper.isNullAt(0)).isFalse(); + } + @Test public void testMissingBlobDescriptorUsesDefaultBehaviorWithoutChecking() { java.nio.file.Path missing = tempPath.resolve("missing.blob"); @@ -72,8 +126,25 @@ public void testMissingBlobDescriptorUsesDefaultBehaviorWithoutChecking() { } private GenericRowData descriptorRow(java.nio.file.Path path, long length) { - return GenericRowData.of( - new BlobDescriptor(path.toUri().toString(), 0, length).serialize()); + return descriptorRow(path.toUri().toString(), length); + } + + private GenericRowData descriptorRow(String uri, long length) { + return GenericRowData.of(new BlobDescriptor(uri, 0, length).serialize()); + } + + private static void sendResponse(HttpExchange exchange, int statusCode, byte[] body) + throws IOException { + boolean headRequest = "HEAD".equals(exchange.getRequestMethod()); + long responseLength = headRequest ? -1 : body.length; + exchange.sendResponseHeaders(statusCode, responseLength); + if (!headRequest && body.length > 0) { + try (OutputStream outputStream = exchange.getResponseBody()) { + outputStream.write(body); + } + } else { + exchange.close(); + } } private FlinkRowWrapper wrapper(GenericRowData row, boolean checkBlobDescriptorExists) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java index c65f76eeaa9d..a93355fbc64a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFileFormat.java @@ -50,6 +50,7 @@ public class BlobFileFormat extends FileFormat { private final boolean blobAsDescriptor; + private boolean writeNullOnMissingFile; @Nullable public BlobConsumer writeConsumer; @@ -62,6 +63,10 @@ public BlobFileFormat(boolean blobAsDescriptor) { this.blobAsDescriptor = blobAsDescriptor; } + public void setWriteNullOnMissingFile(boolean writeNullOnMissingFile) { + this.writeNullOnMissingFile = writeNullOnMissingFile; + } + public static boolean isBlobFile(String fileName) { return fileName.endsWith("." + BlobFileFormatFactory.IDENTIFIER); } @@ -108,7 +113,7 @@ private BlobFormatWriterFactory(RowType type) { @Override public FormatWriter create(PositionOutputStream out, String compression) { - return new BlobFormatWriter(out, writeConsumer, type); + return new BlobFormatWriter(out, writeConsumer, type, writeNullOnMissingFile); } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java index f97158c17186..d61a549ef962 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/blob/BlobFormatWriter.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BlobConsumer; import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.data.BlobPlaceholder; +import org.apache.paimon.data.BlobRef; import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FileAwareFormatWriter; import org.apache.paimon.format.FormatWriter; @@ -32,6 +33,9 @@ import org.apache.paimon.utils.DeltaVarintCompressor; import org.apache.paimon.utils.LongArrayList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; @@ -44,6 +48,8 @@ /** {@link FormatWriter} for blob file. */ public class BlobFormatWriter implements FileAwareFormatWriter { + private static final Logger LOG = LoggerFactory.getLogger(BlobFormatWriter.class); + public static final byte VERSION = 1; public static final int MAGIC_NUMBER = 1481511375; public static final byte[] MAGIC_NUMBER_BYTES = intToLittleEndian(MAGIC_NUMBER); @@ -53,6 +59,7 @@ public class BlobFormatWriter implements FileAwareFormatWriter { private final PositionOutputStream out; @Nullable private final BlobConsumer writeConsumer; private final String blobFieldName; + private final boolean writeNullOnMissingFile; private final CRC32 crc32; private final byte[] tmpBuffer; private final LongArrayList lengths; @@ -61,8 +68,17 @@ public class BlobFormatWriter implements FileAwareFormatWriter { public BlobFormatWriter( PositionOutputStream out, @Nullable BlobConsumer writeConsumer, RowType type) { + this(out, writeConsumer, type, false); + } + + public BlobFormatWriter( + PositionOutputStream out, + @Nullable BlobConsumer writeConsumer, + RowType type, + boolean writeNullOnMissingFile) { this.out = out; this.writeConsumer = writeConsumer; + this.writeNullOnMissingFile = writeNullOnMissingFile; checkArgument(type.getFieldCount() == 1, "BlobFormatWriter only support one field."); this.blobFieldName = type.getFieldNames().get(0); this.crc32 = new CRC32(); @@ -96,22 +112,36 @@ public void addElement(InternalRow element) throws IOException { return; } - long previousPos = out.getPos(); - crc32.reset(); + SeekableInputStream in; + try { + in = blob.newInputStream(); + } catch (IOException | RuntimeException e) { + if (writeNullOnMissingFile && isNotFoundError(e)) { + LOG.warn( + "Failed to open blob from {}, writing NULL for BLOB field {}.", + blobUri(blob), + blobFieldName, + e); + writeNullElement(); + return; + } + throw e; + } + crc32.reset(); write(MAGIC_NUMBER_BYTES); - long blobPos = out.getPos(); - try (SeekableInputStream in = blob.newInputStream()) { - int bytesRead = in.read(tmpBuffer); + long blobLength = 0; + try (SeekableInputStream stream = in) { + int bytesRead = stream.read(tmpBuffer); while (bytesRead >= 0) { write(tmpBuffer, bytesRead); - bytesRead = in.read(tmpBuffer); + blobLength += bytesRead; + bytesRead = stream.read(tmpBuffer); } } - long blobLength = out.getPos() - blobPos; - long binLength = out.getPos() - previousPos + 12; + long binLength = blobLength + MAGIC_NUMBER_BYTES.length + 12; lengths.add(binLength); byte[] lenBytes = longToLittleEndian(binLength); write(lenBytes); @@ -127,6 +157,33 @@ public void addElement(InternalRow element) throws IOException { } } + private void writeNullElement() throws IOException { + lengths.add(NULL_LENGTH); + if (writeConsumer != null) { + writeConsumer.accept(blobFieldName, null); + } + } + + private static String blobUri(Blob blob) { + if (blob instanceof BlobRef) { + return ((BlobRef) blob).toDescriptor().uri(); + } + return "unknown"; + } + + private static boolean isNotFoundError(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof RuntimeException + && current.getMessage() != null + && current.getMessage().startsWith("HTTP error code: 404")) { + return true; + } + current = current.getCause(); + } + return false; + } + private void write(byte[] bytes) throws IOException { write(bytes, bytes.length); } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFormatWriterTest.java b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFormatWriterTest.java new file mode 100644 index 000000000000..1747c74869a2 --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/blob/BlobFormatWriterTest.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.blob; + +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobDescriptor; +import org.apache.paimon.data.BlobRef; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.reader.FileRecordIterator; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.UriReader; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Files; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link BlobFormatWriter}. */ +public class BlobFormatWriterTest { + + @Test + public void testWriteNullOnMissingFileFallbackForHttpNotFound( + @TempDir java.nio.file.Path tempDir) throws Exception { + RowType rowType = RowType.of(DataTypes.BLOB()); + java.nio.file.Path outputFile = tempDir.resolve("blob.out"); + BlobFormatWriter writer = + new BlobFormatWriter( + new LocalFileIO.LocalPositionOutputStream(outputFile.toFile()), + null, + rowType, + true); + + writer.addElement( + GenericRow.of( + new BlobRef( + failingHttpReader(), + new BlobDescriptor("https://example.com/missing.jpg", 0, -1)))); + + writer.close(); + + assertThat(outputFile.toFile()).exists(); + } + + @Test + public void testMissingHttpBlobFollowedByValidBlobPreservesReadback( + @TempDir java.nio.file.Path tempDir) throws Exception { + RowType rowType = RowType.of(DataTypes.BLOB()); + java.nio.file.Path outputFile = tempDir.resolve("blob.out"); + byte[] validPayload = "valid-blob-content".getBytes(); + + BlobFormatWriter writer = + new BlobFormatWriter( + new LocalFileIO.LocalPositionOutputStream(outputFile.toFile()), + null, + rowType, + true); + writer.addElement( + GenericRow.of( + new BlobRef( + failingHttpReader(), + new BlobDescriptor("https://example.com/missing.jpg", 0, -1)))); + writer.addElement(GenericRow.of(Blob.fromData(validPayload))); + writer.close(); + + LocalFileIO fileIO = new LocalFileIO(); + Path filePath = new Path(outputFile.toUri()); + long fileSize = Files.size(outputFile); + try (SeekableInputStream in = fileIO.newInputStream(filePath)) { + BlobFileMeta fileMeta = new BlobFileMeta(in, fileSize, null); + assertThat(fileMeta.recordNumber()).isEqualTo(2); + assertThat(fileMeta.isNull(0)).isTrue(); + assertThat(fileMeta.isNull(1)).isFalse(); + + BlobFormatReader reader = new BlobFormatReader(fileIO, filePath, fileMeta, in, 1, 0); + FileRecordIterator iterator = reader.readBatch(); + InternalRow nullRow = iterator.next(); + InternalRow validRow = iterator.next(); + + assertThat(nullRow.isNullAt(0)).isTrue(); + Blob readBlob = validRow.getBlob(0); + try (SeekableInputStream blobIn = readBlob.newInputStream()) { + byte[] actual = new byte[validPayload.length]; + org.apache.paimon.utils.IOUtils.readFully(blobIn, actual); + assertThat(actual).isEqualTo(validPayload); + } + } + } + + @Test + public void testTwoConsecutiveBlobsPreserveReadback(@TempDir java.nio.file.Path tempDir) + throws Exception { + RowType rowType = RowType.of(DataTypes.BLOB()); + java.nio.file.Path outputFile = tempDir.resolve("blob.out"); + byte[] firstPayload = "first-blob".getBytes(); + byte[] secondPayload = "second-blob-payload".getBytes(); + + BlobFormatWriter writer = + new BlobFormatWriter( + new LocalFileIO.LocalPositionOutputStream(outputFile.toFile()), + null, + rowType, + true); + writer.addElement(GenericRow.of(Blob.fromData(firstPayload))); + writer.addElement(GenericRow.of(Blob.fromData(secondPayload))); + writer.close(); + + LocalFileIO fileIO = new LocalFileIO(); + Path filePath = new Path(outputFile.toUri()); + long fileSize = Files.size(outputFile); + try (SeekableInputStream in = fileIO.newInputStream(filePath)) { + BlobFileMeta fileMeta = new BlobFileMeta(in, fileSize, null); + assertThat(fileMeta.recordNumber()).isEqualTo(2); + + BlobFormatReader reader = new BlobFormatReader(fileIO, filePath, fileMeta, in, 1, 0); + FileRecordIterator iterator = reader.readBatch(); + assertBlobPayload(iterator.next().getBlob(0), firstPayload); + assertBlobPayload(iterator.next().getBlob(0), secondPayload); + } + } + + private static void assertBlobPayload(Blob blob, byte[] expected) throws Exception { + try (SeekableInputStream blobIn = blob.newInputStream()) { + byte[] actual = new byte[expected.length]; + org.apache.paimon.utils.IOUtils.readFully(blobIn, actual); + assertThat(actual).isEqualTo(expected); + } + } + + @Test + public void testWriteNullOnMissingFileDisabledPropagatesHttpNotFound( + @TempDir java.nio.file.Path tempDir) throws Exception { + RowType rowType = RowType.of(DataTypes.BLOB()); + BlobFormatWriter writer = + new BlobFormatWriter( + new LocalFileIO.LocalPositionOutputStream( + tempDir.resolve("blob.out").toFile()), + null, + rowType, + false); + + assertThatThrownBy( + () -> + writer.addElement( + GenericRow.of( + new BlobRef( + failingHttpReader(), + new BlobDescriptor( + "https://example.com/missing.jpg", + 0, + -1))))) + .isInstanceOf(RuntimeException.class) + .hasMessage("HTTP error code: 404"); + } + + private static UriReader failingHttpReader() { + return new UriReader() { + @Override + public SeekableInputStream newInputStream(String uri) { + throw new RuntimeException("HTTP error code: 404"); + } + }; + } +}