From 7afe72b0ee2a8d8a692b561649648c7977b9f193 Mon Sep 17 00:00:00 2001 From: Dmitriy Fingerman Date: Mon, 19 Jan 2026 18:01:04 -0500 Subject: [PATCH] HIVE-29391: Enable Independent Scaling of HMS REST Catalog from HMS --- .../cli/TestStandaloneRESTCatalogServer.java | 227 ++++++++++++++++++ .../iceberg/rest/HMSCatalogFactory.java | 11 +- .../StandaloneRESTCatalogServer.java | 210 ++++++++++++++++ .../hadoop/hive/metastore/HiveMetaStore.java | 12 +- 4 files changed, 457 insertions(+), 3 deletions(-) create mode 100644 itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestStandaloneRESTCatalogServer.java create mode 100644 standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/standalone/StandaloneRESTCatalogServer.java diff --git a/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestStandaloneRESTCatalogServer.java b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestStandaloneRESTCatalogServer.java new file mode 100644 index 000000000000..a5ec398d4b2b --- /dev/null +++ b/itests/qtest-iceberg/src/test/java/org/apache/hadoop/hive/cli/TestStandaloneRESTCatalogServer.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.cli; + +import java.io.File; +import org.apache.hadoop.conf.Configuration; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; +import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.iceberg.rest.standalone.StandaloneRESTCatalogServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Integration test for Standalone REST Catalog Server. + * + * Tests that the standalone server can: + * 1. Start independently of HMS + * 2. Connect to an external HMS instance + * 3. Serve REST Catalog requests + * 4. Provide health check endpoint + */ +public class TestStandaloneRESTCatalogServer { + private static final Logger LOG = LoggerFactory.getLogger(TestStandaloneRESTCatalogServer.class); + + private Configuration hmsConf; + private Configuration restCatalogConf; + private int hmsPort; + private StandaloneRESTCatalogServer restCatalogServer; + private File warehouseDir; + private File hmsTempDir; + + @Before + public void setup() throws Exception { + // Setup temporary directories + hmsTempDir = new File(System.getProperty("java.io.tmpdir"), "test-hms-" + System.currentTimeMillis()); + hmsTempDir.mkdirs(); + warehouseDir = new File(hmsTempDir, "warehouse"); + warehouseDir.mkdirs(); + + // Configure and start embedded HMS + hmsConf = MetastoreConf.newMetastoreConf(); + MetaStoreTestUtils.setConfForStandloneMode(hmsConf); + + String jdbcUrl = String.format("jdbc:derby:memory:%s;create=true", + new File(hmsTempDir, "metastore_db").getAbsolutePath()); + MetastoreConf.setVar(hmsConf, ConfVars.CONNECT_URL_KEY, jdbcUrl); + MetastoreConf.setVar(hmsConf, ConfVars.WAREHOUSE, warehouseDir.getAbsolutePath()); + MetastoreConf.setVar(hmsConf, ConfVars.WAREHOUSE_EXTERNAL, warehouseDir.getAbsolutePath()); + + // Start HMS + hmsPort = MetaStoreTestUtils.startMetaStoreWithRetry( + HadoopThriftAuthBridge.getBridge(), hmsConf, true, false, false, false); + LOG.info("Started embedded HMS on port: {}", hmsPort); + + // Configure standalone REST Catalog server + restCatalogConf = MetastoreConf.newMetastoreConf(); + String hmsUri = "thrift://localhost:" + hmsPort; + MetastoreConf.setVar(restCatalogConf, ConfVars.THRIFT_URIS, hmsUri); + MetastoreConf.setVar(restCatalogConf, ConfVars.WAREHOUSE, warehouseDir.getAbsolutePath()); + MetastoreConf.setVar(restCatalogConf, ConfVars.WAREHOUSE_EXTERNAL, warehouseDir.getAbsolutePath()); + + // Configure REST Catalog servlet + int restPort = MetaStoreTestUtils.findFreePort(); + MetastoreConf.setLongVar(restCatalogConf, ConfVars.CATALOG_SERVLET_PORT, restPort); + MetastoreConf.setVar(restCatalogConf, ConfVars.ICEBERG_CATALOG_SERVLET_PATH, "iceberg"); + MetastoreConf.setVar(restCatalogConf, ConfVars.CATALOG_SERVLET_AUTH, "none"); + + // Start standalone REST Catalog server + restCatalogServer = new StandaloneRESTCatalogServer(restCatalogConf); + restCatalogServer.start(); + LOG.info("Started standalone REST Catalog server on port: {}", restCatalogServer.getPort()); + } + + @After + public void teardown() { + if (restCatalogServer != null) { + restCatalogServer.stop(); + } + if (hmsPort > 0) { + MetaStoreTestUtils.close(hmsPort); + } + if (hmsTempDir != null && hmsTempDir.exists()) { + deleteDirectory(hmsTempDir); + } + } + + @Test(timeout = 60000) + public void testHealthCheck() throws Exception { + LOG.info("=== Test: Health Check ==="); + + String healthUrl = "http://localhost:" + restCatalogServer.getPort() + "/health"; + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + HttpGet request = new HttpGet(healthUrl); + try (CloseableHttpResponse response = httpClient.execute(request)) { + assertEquals("Health check should return 200", 200, response.getStatusLine().getStatusCode()); + LOG.info("Health check passed"); + } + } + } + + @Test(timeout = 60000) + public void testRESTCatalogConfig() throws Exception { + LOG.info("=== Test: REST Catalog Config Endpoint ==="); + + String configUrl = restCatalogServer.getRestEndpoint() + "/v1/config"; + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + HttpGet request = new HttpGet(configUrl); + try (CloseableHttpResponse response = httpClient.execute(request)) { + assertEquals("Config endpoint should return 200", 200, response.getStatusLine().getStatusCode()); + + String responseBody = EntityUtils.toString(response.getEntity()); + LOG.info("Config response: {}", responseBody); + // ConfigResponse should contain endpoints, defaults, and overrides + assertTrue("Response should contain endpoints", responseBody.contains("endpoints")); + assertTrue("Response should be valid JSON", responseBody.startsWith("{") && responseBody.endsWith("}")); + } + } + } + + @Test(timeout = 60000) + public void testRESTCatalogNamespaceOperations() throws Exception { + LOG.info("=== Test: REST Catalog Namespace Operations ==="); + + String namespacesUrl = restCatalogServer.getRestEndpoint() + "/v1/namespaces"; + String namespaceName = "testdb"; + + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + // List namespaces (before creation) + HttpGet listRequest = new HttpGet(namespacesUrl); + listRequest.setHeader("Content-Type", "application/json"); + try (CloseableHttpResponse response = httpClient.execute(listRequest)) { + assertEquals("List namespaces should return 200", 200, response.getStatusLine().getStatusCode()); + } + + // Create namespace - REST Catalog API requires JSON body with namespace array + HttpPost createRequest = new HttpPost(namespacesUrl); + createRequest.setHeader("Content-Type", "application/json"); + String jsonBody = "{\"namespace\":[\"" + namespaceName + "\"]}"; + createRequest.setEntity(new StringEntity(jsonBody, "UTF-8")); + + try (CloseableHttpResponse response = httpClient.execute(createRequest)) { + assertEquals("Create namespace should return 200", 200, response.getStatusLine().getStatusCode()); + } + + // Verify namespace exists by checking it in the list + HttpGet listAfterRequest = new HttpGet(namespacesUrl); + listAfterRequest.setHeader("Content-Type", "application/json"); + try (CloseableHttpResponse response = httpClient.execute(listAfterRequest)) { + assertEquals("List namespaces after creation should return 200", + 200, response.getStatusLine().getStatusCode()); + + String responseBody = EntityUtils.toString(response.getEntity()); + LOG.info("Namespaces list response: {}", responseBody); + assertTrue("Response should contain created namespace", responseBody.contains(namespaceName)); + } + + // Verify namespace exists by getting it directly + String getNamespaceUrl = restCatalogServer.getRestEndpoint() + "/v1/namespaces/" + namespaceName; + HttpGet getRequest = new HttpGet(getNamespaceUrl); + getRequest.setHeader("Content-Type", "application/json"); + try (CloseableHttpResponse response = httpClient.execute(getRequest)) { + assertEquals("Get namespace should return 200", + 200, response.getStatusLine().getStatusCode()); + String responseBody = EntityUtils.toString(response.getEntity()); + LOG.info("Get namespace response: {}", responseBody); + assertTrue("Response should contain namespace", responseBody.contains(namespaceName)); + } + } + + LOG.info("Namespace operations passed"); + } + + @Test(timeout = 60000) + public void testServerPort() { + LOG.info("=== Test: Server Port ==="); + assertTrue("Server port should be > 0", restCatalogServer.getPort() > 0); + assertNotNull("REST endpoint should not be null", restCatalogServer.getRestEndpoint()); + LOG.info("Server port: {}, Endpoint: {}", restCatalogServer.getPort(), restCatalogServer.getRestEndpoint()); + } + + private void deleteDirectory(File directory) { + if (directory.exists()) { + File[] files = directory.listFiles(); + if (files != null) { + for (File file : files) { + if (file.isDirectory()) { + deleteDirectory(file); + } else { + file.delete(); + } + } + } + directory.delete(); + } + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java index 4b085e9d34cf..e5ad8b1c5881 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java @@ -69,9 +69,14 @@ public String getPath() { */ private Catalog createCatalog() { final Map properties = new TreeMap<>(); - MetastoreConf.setVar(configuration, MetastoreConf.ConfVars.THRIFT_URIS, ""); final String configUri = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.THRIFT_URIS); - if (configUri != null) { + // Clear THRIFT_URIS so HiveCatalog doesn't accidentally use Thrift connection + // when REST Catalog is embedded in HMS (same JVM). HiveCatalog reads from Configuration + // as fallback, so clearing it ensures it uses embedded connection when "uri" is not set. + MetastoreConf.setVar(configuration, MetastoreConf.ConfVars.THRIFT_URIS, ""); + // Only set "uri" property if THRIFT_URIS was configured (standalone mode) + // This tells HiveCatalog to use Thrift connection to external HMS + if (configUri != null && !configUri.isEmpty()) { properties.put("uri", configUri); } final String configWarehouse = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.WAREHOUSE); @@ -81,6 +86,8 @@ private Catalog createCatalog() { final String configExtWarehouse = MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.WAREHOUSE_EXTERNAL); if (configExtWarehouse != null) { properties.put("external-warehouse", configExtWarehouse); + // HiveCatalog reads this property directly from Configuration, not from properties map + configuration.set("hive.metastore.warehouse.external.dir", configExtWarehouse); } if (configuration.get(SERVLET_ID_KEY) != null) { // For the testing purpose. HiveCatalog caches a metastore client in a static field. As our tests can spin up diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/standalone/StandaloneRESTCatalogServer.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/standalone/StandaloneRESTCatalogServer.java new file mode 100644 index 000000000000..ab10eaac192d --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/standalone/StandaloneRESTCatalogServer.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest.standalone; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.ServletServerBuilder; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.iceberg.rest.HMSCatalogFactory; +import org.eclipse.jetty.server.Server; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Standalone REST Catalog Server. + * + *

This server runs independently of HMS and provides a REST API for Iceberg catalog operations. + * It connects to an external HMS instance via Thrift. + * + *

Designed for Kubernetes deployment with load balancer/API gateway in front: + *

+ *   Client → Load Balancer/API Gateway → StandaloneRESTCatalogServer → HMS
+ * 
+ * + *

Multiple instances can run behind a Kubernetes Service for load balancing. + */ +public class StandaloneRESTCatalogServer { + private static final Logger LOG = LoggerFactory.getLogger(StandaloneRESTCatalogServer.class); + + private final Configuration conf; + private Server server; + private int port; + + public StandaloneRESTCatalogServer(Configuration conf) { + this.conf = conf; + } + + /** + * Starts the standalone REST Catalog server. + */ + public void start() { + // Validate required configuration + String thriftUris = MetastoreConf.getVar(conf, ConfVars.THRIFT_URIS); + if (thriftUris == null || thriftUris.isEmpty()) { + throw new IllegalArgumentException("metastore.thrift.uris must be configured to connect to HMS"); + } + + int servletPort = MetastoreConf.getIntVar(conf, ConfVars.CATALOG_SERVLET_PORT); + String servletPath = MetastoreConf.getVar(conf, ConfVars.ICEBERG_CATALOG_SERVLET_PATH); + + if (servletPath == null || servletPath.isEmpty()) { + servletPath = "iceberg"; // Default path + MetastoreConf.setVar(conf, ConfVars.ICEBERG_CATALOG_SERVLET_PATH, servletPath); + } + + LOG.info("Starting Standalone REST Catalog Server"); + LOG.info(" HMS Thrift URIs: {}", thriftUris); + LOG.info(" Servlet Port: {}", servletPort); + LOG.info(" Servlet Path: /{}", servletPath); + + // Create servlet using factory + ServletServerBuilder.Descriptor catalogDescriptor = HMSCatalogFactory.createServlet(conf); + if (catalogDescriptor == null) { + throw new IllegalStateException("Failed to create REST Catalog servlet. " + + "Check that metastore.catalog.servlet.port and metastore.iceberg.catalog.servlet.path are configured."); + } + + // Create health check servlet + HealthCheckServlet healthServlet = new HealthCheckServlet(); + + // Build and start server + ServletServerBuilder builder = new ServletServerBuilder(conf); + builder.addServlet(catalogDescriptor); + builder.addServlet(servletPort, "health", healthServlet); + + server = builder.start(LOG); + if (server == null || !server.isStarted()) { + // Server failed to start - likely a port conflict + throw new IllegalStateException(String.format( + "Failed to start REST Catalog server on port %d. Port may already be in use. ", servletPort)); + } + + // Get actual port (may be auto-assigned) + port = catalogDescriptor.getPort(); + LOG.info("Standalone REST Catalog Server started successfully on port {}", port); + LOG.info(" REST Catalog endpoint: http://localhost:{}/{}", port, servletPath); + LOG.info(" Health check endpoint: http://localhost:{}/health", port); + } + + /** + * Stops the server. + */ + public void stop() { + if (server != null && server.isStarted()) { + try { + LOG.info("Stopping Standalone REST Catalog Server"); + server.stop(); + server.join(); + LOG.info("Standalone REST Catalog Server stopped"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Server stop interrupted", e); + } catch (Exception e) { + LOG.error("Error stopping server", e); + } + } + } + + /** + * Gets the port the server is listening on. + * @return the port number + */ + @VisibleForTesting + public int getPort() { + return port; + } + + + /** + * Gets the REST Catalog endpoint URL. + * @return the endpoint URL + */ + public String getRestEndpoint() { + String servletPath = MetastoreConf.getVar(conf, ConfVars.ICEBERG_CATALOG_SERVLET_PATH); + if (servletPath == null || servletPath.isEmpty()) { + servletPath = "iceberg"; + } + return "http://localhost:" + port + "/" + servletPath; + } + + /** + * Simple health check servlet for Kubernetes readiness/liveness probes. + */ + private static final class HealthCheckServlet extends HttpServlet { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) { + try { + resp.setContentType("application/json"); + resp.setStatus(HttpServletResponse.SC_OK); + resp.getWriter().println("{\"status\":\"healthy\"}"); + } catch (IOException e) { + LOG.warn("Failed to write health check response", e); + } + } + } + + /** + * Main method for running as a standalone application. + * @param args command line arguments + */ + public static void main(String[] args) { + Configuration conf = MetastoreConf.newMetastoreConf(); + + // Load configuration from command line args or environment + // Format: -Dkey=value or use system properties + for (String arg : args) { + if (arg.startsWith("-D")) { + String[] kv = arg.substring(2).split("=", 2); + if (kv.length == 2) { + conf.set(kv[0], kv[1]); + } + } + } + + StandaloneRESTCatalogServer server = new StandaloneRESTCatalogServer(conf); + + // Add shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + LOG.info("Shutdown hook triggered"); + server.stop(); + })); + + try { + server.start(); + LOG.info("Server running. Press Ctrl+C to stop."); + + // Keep server running + server.server.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Server stop interrupted", e); + } catch (Exception e) { + LOG.error("Failed to start server", e); + System.exit(1); + } + } +} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 46c20ce985cf..2ffe2351dee4 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -727,11 +727,21 @@ public static void startMetaStore(int port, HadoopThriftAuthBridge bridge, // optionally create and start the property and Iceberg REST server ServletServerBuilder builder = new ServletServerBuilder(conf); ServletServerBuilder.Descriptor properties = builder.addServlet(PropertyServlet.createServlet(conf)); - builder.addServlet(createCatalogServlet(conf)); + ServletServerBuilder.Descriptor catalogDescriptor = builder.addServlet(createCatalogServlet(conf)); + servletServer = builder.start(LOG); if (servletServer != null && properties != null) { propertyServletPort = properties.getPort(); } + + // If REST Catalog server was required but failed to start, fail HMS startup + int servletPort = MetastoreConf.getIntVar(conf, ConfVars.CATALOG_SERVLET_PORT); + boolean restCatalogRequired = catalogDescriptor != null && servletPort >= 0; + + if (restCatalogRequired && servletServer == null) { + throw new IllegalStateException(String.format( + "Failed to start embedded REST Catalog server on port %d. Port may already be in use.", servletPort)); + } // main server thriftServer.start();