From 49e5aa5cc6b7c4a05a7e2bc47075d8da12837325 Mon Sep 17 00:00:00 2001 From: psainics Date: Thu, 31 Jul 2025 08:19:31 +0000 Subject: [PATCH] Tink build for 6.10 --- pom.xml | 142 +++++++++--------- .../gcp/crypto/EncryptedFileSystem.java | 83 ++++++++++ 2 files changed, 154 insertions(+), 71 deletions(-) diff --git a/pom.xml b/pom.xml index 68790a49ef..1e7b988d25 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,7 @@ 3.19.4 1.3.0-rc3 27.0.1-jre - 2.9.2 + 3.3.6 1.4.13 1.4.13 4.5.13 @@ -365,76 +365,6 @@ - - - org.apache.hbase - hbase-shaded-client - ${hbase-shaded-client.version} - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - - - - org.apache.hbase - hbase-shaded-server - ${hbase-shaded-server.version} - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - - - io.dropwizard.metrics - metrics-core - ${dropwizard.metrics-core.version} - - - com.google.cloud - google-cloud-bigquery - ${google.cloud.bigquery.version} - - - com.google.crypto.tink - tink - ${google.tink.version} - - - com.google.crypto.tink - tink-gcpkms - ${google.tink.version} - - - com.google.cloud - google-cloud-spanner - ${google.cloud.spanner.version} - - - com.google.cloud - google-cloud-datastore - ${google.cloud.datastore.version} - org.apache.hadoop hadoop-common @@ -511,6 +441,76 @@ + + + org.apache.hbase + hbase-shaded-client + ${hbase-shaded-client.version} + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + + + + org.apache.hbase + hbase-shaded-server + ${hbase-shaded-server.version} + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + + + io.dropwizard.metrics + metrics-core + ${dropwizard.metrics-core.version} + + + com.google.cloud + google-cloud-bigquery + ${google.cloud.bigquery.version} + + + com.google.crypto.tink + tink + ${google.tink.version} + + + com.google.crypto.tink + tink-gcpkms + ${google.tink.version} + + + com.google.cloud + google-cloud-spanner + ${google.cloud.spanner.version} + + + com.google.cloud + google-cloud-datastore + ${google.cloud.datastore.version} + org.apache.hadoop hadoop-mapreduce-client-core diff --git a/src/main/java/io/cdap/plugin/gcp/crypto/EncryptedFileSystem.java b/src/main/java/io/cdap/plugin/gcp/crypto/EncryptedFileSystem.java index 7ec6d8ded8..03cc356b70 100644 --- a/src/main/java/io/cdap/plugin/gcp/crypto/EncryptedFileSystem.java +++ b/src/main/java/io/cdap/plugin/gcp/crypto/EncryptedFileSystem.java @@ -22,7 +22,11 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathHandle; +import org.apache.hadoop.fs.impl.OpenFileParameters; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +36,8 @@ import java.nio.channels.Channels; import java.nio.channels.SeekableByteChannel; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; /** * A hadoop {@link FileSystem} that support files decryption (encryption is currently not supported). @@ -42,6 +48,7 @@ public class EncryptedFileSystem extends FilterFileSystem { private static final String FS_SCHEME = CONF_PREFIX + "scheme"; private static final String FS_IMPL = CONF_PREFIX + "impl"; private static final String DECRYPTOR_IMPL = CONF_PREFIX + "decryptor.impl"; + private static final int DEFAULT_BUFFER_SIZE = 4096; private static final Logger LOG = LoggerFactory.getLogger(EncryptedFileSystem.class); @@ -103,6 +110,82 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException { return new FSDataInputStream(new SeekableByteChannelFSInputStream(decryptor.open(fs, path, bufferSize))); } + /** + * Opens a file asynchronously and returns a {@link FutureDataInputStreamBuilder} + * to build a {@link FSDataInputStream} for the specified {@link Path}. + * + *

This implementation returns a builder that constructs an input stream by using a decryptor + * to open the file through a {@link SeekableByteChannelFSInputStream}. The file is read + * with a buffer size of 4096 bytes.

+ * + * @param path the {@link Path} of the file to open + * @return a {@link FutureDataInputStreamBuilder} that asynchronously builds a {@link FSDataInputStream} + * @throws UnsupportedOperationException if the operation is not supported + */ + @Override + public FutureDataInputStreamBuilder openFile(Path path) throws UnsupportedOperationException { + return new FutureDataInputStreamBuilder() { + @Override + public CompletableFuture build() + throws IllegalArgumentException, UnsupportedOperationException { + return CompletableFuture.supplyAsync(() -> { + try { + return new FSDataInputStream( + new SeekableByteChannelFSInputStream(decryptor.open(fs, path, DEFAULT_BUFFER_SIZE))); + } catch (Exception e) { + throw new CompletionException(e); + } + }); + } + + @Override + public FutureDataInputStreamBuilder opt(@NotNull String s, @NotNull String s1) { + return this; + } + + @Override + public FutureDataInputStreamBuilder opt(@NotNull String s, @NotNull String... strings) { + return this; + } + + @Override + public FutureDataInputStreamBuilder must(@NotNull String s, @NotNull String s1) { + return this; + } + + @Override + public FutureDataInputStreamBuilder must(@NotNull String s, @NotNull String... strings) { + return this; + } + }; + } + + /** + * Opens a file asynchronously using the provided {@link Path}, and returns + * a {@link CompletableFuture} that supplies a {@link FSDataInputStream}. + * + *

This method uses a decryptor to open the file and wraps it in a {@link SeekableByteChannelFSInputStream}. + * It uses the buffer size specified in the {@code parameters}; if the buffer size is not greater than zero, + * a default of 4096 bytes is used.

+ * + * @param path the {@link Path} to the file to open + * @param parameters the {@link OpenFileParameters} containing optional configuration, such as buffer size + * @return a {@link CompletableFuture} that will complete with the {@link FSDataInputStream} + * @throws CompletionException if an exception occurs during file opening + */ + @Override + protected CompletableFuture openFileWithOptions(Path path, OpenFileParameters parameters) { + return CompletableFuture.supplyAsync(() -> { + try { + int bufferSize = parameters.getBufferSize() > 0 ? parameters.getBufferSize() : 4096; + return new FSDataInputStream( + new SeekableByteChannelFSInputStream(decryptor.open(fs, path, bufferSize))); + } catch (Exception e) { + throw new CompletionException(e); + } + }); + } + /** * A {@link FSInputStream} implementation backed by a {@link SeekableByteChannel}. */