Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,22 @@ public class CatalogContext implements Serializable {
private static final long serialVersionUID = 1L;

private final Options options;
private final SerializableConfiguration hadoopConf;
@Nullable private final SerializableConfiguration hadoopConf;
@Nullable private final FileIOLoader preferIOLoader;
@Nullable private final FileIOLoader fallbackIOLoader;

private CatalogContext(
Options options,
@Nullable Configuration hadoopConf,
boolean loadHadoopConf,
@Nullable FileIOLoader preferIOLoader,
@Nullable FileIOLoader fallbackIOLoader) {
this.options = checkNotNull(options);
this.hadoopConf =
new SerializableConfiguration(
hadoopConf == null ? getHadoopConfiguration(options) : hadoopConf);
hadoopConf == null && !loadHadoopConf
? null
: new SerializableConfiguration(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you just create a method and do try catch? If there is no Hadoop class, set it directly to NULL.

hadoopConf == null ? getHadoopConfiguration(options) : hadoopConf);
this.preferIOLoader = preferIOLoader;
this.fallbackIOLoader = fallbackIOLoader;
}
Expand All @@ -69,28 +72,39 @@ public static CatalogContext create(Path warehouse) {
}

public static CatalogContext create(Options options) {
return new CatalogContext(options, null, null, null);
return new CatalogContext(options, null, true, null, null);
}

public static CatalogContext create(Options options, Configuration hadoopConf) {
return new CatalogContext(options, hadoopConf, null, null);
return new CatalogContext(options, hadoopConf, true, null, null);
}

public static CatalogContext create(Options options, FileIOLoader fallbackIOLoader) {
return new CatalogContext(options, null, null, fallbackIOLoader);
return new CatalogContext(options, null, true, null, fallbackIOLoader);
}

public static CatalogContext create(
Options options, FileIOLoader preferIOLoader, FileIOLoader fallbackIOLoader) {
return new CatalogContext(options, null, preferIOLoader, fallbackIOLoader);
return new CatalogContext(options, null, true, preferIOLoader, fallbackIOLoader);
}

public static CatalogContext create(
Options options,
Configuration hadoopConf,
FileIOLoader preferIOLoader,
FileIOLoader fallbackIOLoader) {
return new CatalogContext(options, hadoopConf, preferIOLoader, fallbackIOLoader);
return new CatalogContext(options, hadoopConf, true, preferIOLoader, fallbackIOLoader);
}

/**
* Create a catalog context without initializing Hadoop configuration.
*
* <p>This should be used only by engines that provide their own {@link FileIOLoader} and do not
* need Paimon's Hadoop-based FileIO path.
*/
public static CatalogContext createWithoutHadoop(
Options options, FileIOLoader preferIOLoader, FileIOLoader fallbackIOLoader) {
return new CatalogContext(options, null, false, preferIOLoader, fallbackIOLoader);
}

public Options options() {
Expand All @@ -99,6 +113,10 @@ public Options options() {

/** Return hadoop {@link Configuration}. */
public Configuration hadoopConf() {
if (hadoopConf == null) {
throw new IllegalStateException(
"Hadoop configuration is not available for this CatalogContext.");
}
return hadoopConf.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@
import org.apache.paimon.table.CatalogTableType;
import org.apache.paimon.utils.HadoopUtilsITCase.TestFileIOLoader;
import org.apache.paimon.utils.InstantiationUtil;
import org.apache.paimon.utils.TraceableFileIO;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;

import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
Expand Down Expand Up @@ -87,6 +93,41 @@ public void testContextDefaultHadoopConf(@TempDir java.nio.file.Path path) {
assertThat(conf.get("dfs.replication")).isEqualTo(replication);
}

@Test
public void testCreateCatalogWithoutHadoop(@TempDir java.nio.file.Path path) {
Path root = new Path(path.toUri().toString());
Options options = new Options();
options.set(WAREHOUSE, new Path(root, "warehouse").toString());

CatalogContext context =
CatalogContext.createWithoutHadoop(options, new TraceableFileIO.Loader(), null);

assertThat(CatalogFactory.createCatalog(context).listDatabases()).isEmpty();
assertThatThrownBy(context::hadoopConf)
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Hadoop configuration is not available");
}

@Test
public void testCreateCatalogWithoutHadoopClasses(@TempDir java.nio.file.Path path)
throws Exception {
try (URLClassLoader classLoader = new NoHadoopClassLoader(testClasspathWithoutHadoop())) {
Class<?> runner =
Class.forName(NoHadoopCatalogContextRunner.class.getName(), true, classLoader);
runner.getMethod("run", String.class, ClassLoader.class)
.invoke(
null,
new Path(path.toUri().toString(), "warehouse").toString(),
classLoader);
} catch (InvocationTargetException e) {
Throwable cause = e.getCause();
if (cause instanceof Exception) {
throw (Exception) cause;
}
throw (Error) cause;
}
}

@Test
public void testContextSerializable() throws IOException, ClassNotFoundException {
Configuration conf = new Configuration(false);
Expand All @@ -97,4 +138,53 @@ public void testContextSerializable() throws IOException, ClassNotFoundException
context = InstantiationUtil.clone(context);
assertThat(context.hadoopConf().get("my_key")).isEqualTo(conf.get("my_key"));
}

private static URL[] testClasspathWithoutHadoop() {
return Arrays.stream(System.getProperty("java.class.path").split(File.pathSeparator))
.filter(path -> !path.contains("/hadoop-"))
.filter(path -> !path.contains("/htrace-core"))
.filter(path -> !path.contains("/woodstox-core"))
.filter(path -> !path.contains("/stax2-api"))
.map(
path -> {
try {
return new File(path).toURI().toURL();
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.toArray(URL[]::new);
}

private static class NoHadoopClassLoader extends URLClassLoader {

private NoHadoopClassLoader(URL[] urls) {
super(urls, ClassLoader.getSystemClassLoader().getParent());
}

@Override
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
if (name.startsWith("org.apache.hadoop.")) {
throw new ClassNotFoundException(name);
}
return super.loadClass(name, resolve);
}
}

/** Runner loaded by {@link NoHadoopClassLoader} to verify no-Hadoop catalog creation. */
public static class NoHadoopCatalogContextRunner {

public static void run(String warehouse, ClassLoader classLoader) throws Exception {
assertThatThrownBy(() -> classLoader.loadClass("org.apache.hadoop.conf.Configuration"))
.isInstanceOf(ClassNotFoundException.class);

Options options = new Options();
options.set("warehouse", warehouse);
CatalogContext context =
CatalogContext.createWithoutHadoop(options, new TraceableFileIO.Loader(), null);
Catalog catalog = CatalogFactory.createCatalog(context, classLoader);

assertThat(catalog.listDatabases()).isEmpty();
}
}
}