diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 10015f74837c..8f8befd83480 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -33,6 +33,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ZooKeeperHiveHelper; import org.apache.hadoop.hive.metastore.utils.StringUtils; @@ -1935,6 +1936,10 @@ public enum ConfVars { "hive.metastore.iceberg.catalog.cache.expiry", -1, "HMS Iceberg Catalog cache expiry." ), + ICEBERG_CATALOG_METRICS_REPORTERS("metastore.iceberg.catalog.metrics.reporters", + "hive.metastore.iceberg.catalog.metrics.reporters", "org.apache.iceberg.rest.metrics.LoggingMetricsReporter", + "A comma separated list of custom Iceberg Metrics Reporting plugins." + ), HTTPSERVER_THREADPOOL_MIN("hive.metastore.httpserver.threadpool.min", "hive.metastore.httpserver.threadpool.min", 8, "HMS embedded HTTP server minimum number of threads." @@ -2516,6 +2521,30 @@ public static Class getClass(Configuration conf, ConfVars var, conf.getClass(var.varname, defaultValue, xface); } + /** + * Get class instances based on a configuration value. + * + * @param conf configuration file to retrieve it from + * @param confVar variable to retrieve + * @return instances of the classes + */ + public static Class[] getClasses(Configuration conf, ConfVars confVar) { + Preconditions.checkArgument(confVar.defaultVal.getClass() == String.class); + Class defaultClass; + try { + defaultClass = Class.forName((String) confVar.defaultVal); + } catch (ClassNotFoundException e) { + throw new IllegalArgumentException( + String.format("Failed to load the the default value of %s: %s", confVar.defaultVal, confVar.varname), + e + ); + } + String val = conf.get(confVar.varname); + return val == null + ? conf.getClasses(confVar.hiveName, defaultClass) + : conf.getClasses(confVar.varname, defaultClass); + } + /** * Set the class name in the configuration file * @param conf configuration file to set it in diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java index 73d23ae5daf0..252bde85a5eb 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java @@ -20,6 +20,9 @@ package org.apache.iceberg.rest; import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.time.Clock; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -52,6 +55,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.rest.HTTPRequest.HTTPMethod; +import org.apache.iceberg.rest.metrics.IcebergMetricsReporter; import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; @@ -72,12 +76,15 @@ import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Original @ RESTCatalogAdapter.java * Adaptor class to translate REST requests into {@link Catalog} API calls. */ public class HMSCatalogAdapter implements RESTClient { + private static final Logger LOG = LoggerFactory.getLogger(HMSCatalogAdapter.class); private static final Splitter SLASH = Splitter.on('/'); private static final Map, Integer> EXCEPTION_ERROR_CODES = @@ -99,17 +106,21 @@ public class HMSCatalogAdapter implements RESTClient { .put(CommitStateUnknownException.class, 500) .buildOrThrow(); + private final String catalogName; private final Catalog catalog; private final SupportsNamespaces asNamespaceCatalog; private final ViewCatalog asViewCatalog; + private final List metricsReporters; + private final Clock clock = Clock.systemUTC(); - - public HMSCatalogAdapter(Catalog catalog) { + public HMSCatalogAdapter(String catalogName, Catalog catalog, List metricsReporters) { Preconditions.checkArgument(catalog instanceof SupportsNamespaces); Preconditions.checkArgument(catalog instanceof ViewCatalog); + this.catalogName = catalogName; this.catalog = catalog; this.asNamespaceCatalog = (SupportsNamespaces) catalog; this.asViewCatalog = (ViewCatalog) catalog; + this.metricsReporters = metricsReporters; } enum Route { @@ -315,9 +326,11 @@ private RESTResponse renameTable(Object body) { return null; } - private RESTResponse reportMetrics(Object body) { - // nothing to do here other than checking that we're getting the correct request - castRequest(ReportMetricsRequest.class, body); + private RESTResponse reportMetrics(Map vars, Object body) { + final TableIdentifier ident = identFromPathVars(vars); + final var report = castRequest(ReportMetricsRequest.class, body).report(); + final var receivedAt = clock.instant(); + metricsReporters.forEach(reporter -> reporter.report(catalogName, ident, report, receivedAt)); return null; } @@ -456,7 +469,7 @@ private T handleRequest( return (T) renameTable(body); case REPORT_METRICS: - return (T) reportMetrics(body); + return (T) reportMetrics(vars, body); case COMMIT_TRANSACTION: return (T) commitTransaction(body); @@ -576,6 +589,13 @@ public T postForm( @Override public void close() { // The caller is responsible for closing the underlying catalog backing this REST catalog. + for (IcebergMetricsReporter reporter : metricsReporters) { + try { + reporter.close(); + } catch (IOException e) { + LOG.error("Failed to close metrics reporter: {}", reporter, e); + } + } } private static class BadResponseType extends RuntimeException { diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java index d21f239f3416..54e0ec20267c 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg.rest; +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -32,6 +34,7 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.rest.metrics.IcebergMetricsReporter; /** * Catalog & servlet factory. @@ -112,7 +115,21 @@ private HttpServlet createServlet(Catalog catalog) { // Iceberg REST client uses "catalog" by default List scopes = Collections.singletonList("catalog"); ServletSecurity security = new ServletSecurity(AuthType.fromString(authType), configuration, req -> scopes); - return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalog))); + String catalogName = MetastoreConf.getVar(configuration, ConfVars.CATALOG_DEFAULT); + List reporters = createReporters(); + return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalogName, catalog, reporters))); + } + + private List createReporters() { + final var classes = MetastoreConf.getClasses(configuration, ConfVars.ICEBERG_CATALOG_METRICS_REPORTERS); + return Arrays.stream(classes).map(clazz -> { + try { + final var constructor = clazz.getDeclaredConstructor(Configuration.class); + return (IcebergMetricsReporter) constructor.newInstance(configuration); + } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new IllegalArgumentException("Failed to instantiate IcebergMetricsReporter: " + clazz.getName(), e); + } + }).toList(); } /** diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java index 6140f40b2de5..e608ba5229a6 100644 --- a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogServlet.java @@ -100,6 +100,12 @@ private Consumer handle(HttpServletResponse response) { }; } + @Override + public void destroy() { + super.destroy(); + restCatalogAdapter.close(); + } + public static class ServletRequestContext { private HTTPMethod method; private String path; @@ -147,8 +153,15 @@ static ServletRequestContext from(HttpServletRequest request) throws IOException Route route = routeContext.first(); Object requestBody = null; if (route.requestClass() != null) { - requestBody = - RESTObjectMapper.mapper().readValue(request.getReader(), route.requestClass()); + try { + requestBody = RESTObjectMapper.mapper().readValue(request.getReader(), route.requestClass()); + } catch (Exception e) { + return new ServletRequestContext(ErrorResponse.builder() + .responseCode(400) + .withType(e.getClass().getSimpleName()) + .withMessage(e.getMessage()) + .build()); + } } Map queryParams = diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/metrics/IcebergMetricsReporter.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/metrics/IcebergMetricsReporter.java new file mode 100644 index 000000000000..d7f781e36233 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/metrics/IcebergMetricsReporter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest.metrics; + +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.metrics.MetricsReport; + +import java.io.Closeable; +import java.time.Instant; + +/** + * An event reporter for Iceberg metrics. + */ +public interface IcebergMetricsReporter extends Closeable { + /** + * Report an event. + * + * @param catalog the catalog name + * @param identifier the table identifier + * @param report the event + * @param receivedAt the timestamp when the Iceberg REST Catalog received the event + */ + void report(String catalog, TableIdentifier identifier, MetricsReport report, Instant receivedAt); +} diff --git a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/metrics/LoggingMetricsReporter.java b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/metrics/LoggingMetricsReporter.java new file mode 100644 index 000000000000..afc37cd40665 --- /dev/null +++ b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/metrics/LoggingMetricsReporter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.rest.metrics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.metrics.MetricsReport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; + +/** + * A metrics reporter that logs events. + */ +public class LoggingMetricsReporter implements IcebergMetricsReporter { + private static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsReporter.class); + + public LoggingMetricsReporter(Configuration conf) { + } + + @Override + public void report(String catalog, TableIdentifier identifier, MetricsReport report, Instant receivedAt) { + LOG.info("Event reported at {}: catalog={}, table={}, report={}", receivedAt, catalog, identifier, report); + } + + @Override + public void close() { + LOG.info("Closing {}", getClass().getName()); + } +} diff --git a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/RESTCatalogServer.java b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/RESTCatalogServer.java index 836e18cb8e77..13e322bcd4b0 100644 --- a/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/RESTCatalogServer.java +++ b/standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/extension/RESTCatalogServer.java @@ -19,6 +19,7 @@ package org.apache.iceberg.rest.extension; +import java.nio.file.Files; import java.nio.file.Path; import java.util.UUID; import org.apache.hadoop.conf.Configuration; @@ -60,6 +61,9 @@ public void start(Configuration conf) throws Exception { String uniqueTestKey = String.format("RESTCatalogServer_%s", UUID.randomUUID()); warehouseDir = Path.of(MetaStoreTestUtils.getTestWarehouseDir(uniqueTestKey)); + Files.createDirectories(warehouseDir); + System.setProperty("derby.stream.error.file", + warehouseDir.resolve("derby.log").toAbsolutePath().toString()); String jdbcUrl = String.format("jdbc:derby:memory:%s;create=true", warehouseDir.resolve("metastore_db").toAbsolutePath()); MetastoreConf.setVar(conf, ConfVars.CONNECT_URL_KEY, jdbcUrl);