Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
Expand Down Expand Up @@ -1935,6 +1936,10 @@ public enum ConfVars {
"hive.metastore.iceberg.catalog.cache.expiry", -1,
"HMS Iceberg Catalog cache expiry."
),
ICEBERG_CATALOG_METRICS_REPORTERS("metastore.iceberg.catalog.metrics.reporters",
"hive.metastore.iceberg.catalog.metrics.reporters", "org.apache.iceberg.rest.metrics.LoggingMetricsReporter",
"A comma separated list of custom Iceberg Metrics Reporting plugins."
),
HTTPSERVER_THREADPOOL_MIN("hive.metastore.httpserver.threadpool.min",
"hive.metastore.httpserver.threadpool.min", 8,
"HMS embedded HTTP server minimum number of threads."
Expand Down Expand Up @@ -2516,6 +2521,30 @@ public static <I> Class<? extends I> getClass(Configuration conf, ConfVars var,
conf.getClass(var.varname, defaultValue, xface);
}

/**
* Get class instances based on a configuration value.
*
* @param conf configuration file to retrieve it from
* @param confVar variable to retrieve
* @return instances of the classes
*/
public static Class<?>[] getClasses(Configuration conf, ConfVars confVar) {
Preconditions.checkArgument(confVar.defaultVal.getClass() == String.class);
Class<?> defaultClass;
try {
defaultClass = Class.forName((String) confVar.defaultVal);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(
String.format("Failed to load the the default value of %s: %s", confVar.defaultVal, confVar.varname),
e
);
}
String val = conf.get(confVar.varname);
return val == null
? conf.getClasses(confVar.hiveName, defaultClass)
: conf.getClasses(confVar.varname, defaultClass);
}

/**
* Set the class name in the configuration file
* @param conf configuration file to set it in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package org.apache.iceberg.rest;

import com.google.common.base.Preconditions;

import java.io.IOException;
import java.time.Clock;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -52,6 +55,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.HTTPRequest.HTTPMethod;
import org.apache.iceberg.rest.metrics.IcebergMetricsReporter;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
Expand All @@ -72,12 +76,15 @@
import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Original @ <a href="https://github.com/apache/iceberg/blob/apache-iceberg-1.9.1/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java">RESTCatalogAdapter.java</a>
* Adaptor class to translate REST requests into {@link Catalog} API calls.
*/
public class HMSCatalogAdapter implements RESTClient {
private static final Logger LOG = LoggerFactory.getLogger(HMSCatalogAdapter.class);
private static final Splitter SLASH = Splitter.on('/');

private static final Map<Class<? extends Exception>, Integer> EXCEPTION_ERROR_CODES =
Expand All @@ -99,17 +106,21 @@ public class HMSCatalogAdapter implements RESTClient {
.put(CommitStateUnknownException.class, 500)
.buildOrThrow();

private final String catalogName;
private final Catalog catalog;
private final SupportsNamespaces asNamespaceCatalog;
private final ViewCatalog asViewCatalog;
private final List<IcebergMetricsReporter> metricsReporters;
private final Clock clock = Clock.systemUTC();


public HMSCatalogAdapter(Catalog catalog) {
public HMSCatalogAdapter(String catalogName, Catalog catalog, List<IcebergMetricsReporter> metricsReporters) {
Preconditions.checkArgument(catalog instanceof SupportsNamespaces);
Preconditions.checkArgument(catalog instanceof ViewCatalog);
this.catalogName = catalogName;
this.catalog = catalog;
this.asNamespaceCatalog = (SupportsNamespaces) catalog;
this.asViewCatalog = (ViewCatalog) catalog;
this.metricsReporters = metricsReporters;
}

enum Route {
Expand Down Expand Up @@ -315,9 +326,11 @@ private RESTResponse renameTable(Object body) {
return null;
}

private RESTResponse reportMetrics(Object body) {
// nothing to do here other than checking that we're getting the correct request
castRequest(ReportMetricsRequest.class, body);
private RESTResponse reportMetrics(Map<String, String> vars, Object body) {
final TableIdentifier ident = identFromPathVars(vars);
final var report = castRequest(ReportMetricsRequest.class, body).report();
final var receivedAt = clock.instant();
metricsReporters.forEach(reporter -> reporter.report(catalogName, ident, report, receivedAt));
return null;
}

Expand Down Expand Up @@ -456,7 +469,7 @@ private <T extends RESTResponse> T handleRequest(
return (T) renameTable(body);

case REPORT_METRICS:
return (T) reportMetrics(body);
return (T) reportMetrics(vars, body);

case COMMIT_TRANSACTION:
return (T) commitTransaction(body);
Expand Down Expand Up @@ -576,6 +589,13 @@ public <T extends RESTResponse> T postForm(
@Override
public void close() {
// The caller is responsible for closing the underlying catalog backing this REST catalog.
for (IcebergMetricsReporter reporter : metricsReporters) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case if multiple reported are present and there is exception in closing 1 reporter, rest all will not be closed. Can we do something like this ?

public void close() {
threadLocalMap.values().forEach(this::closeQuietly);
}
private void closeQuietly(AutoCloseable autoCloseable) {
try {
autoCloseable.close();
} catch (Exception e) {
LOG.warn("Error while closing resource.", e);
}
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. I found the close method would not be invoked, so I added a Servlet lifecycle method and logging for safety.
69c2efd

try {
reporter.close();
} catch (IOException e) {
LOG.error("Failed to close metrics reporter: {}", reporter, e);
}
}
}

private static class BadResponseType extends RuntimeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.rest;

import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -32,6 +34,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.rest.metrics.IcebergMetricsReporter;

/**
* Catalog &amp; servlet factory.
Expand Down Expand Up @@ -112,7 +115,21 @@ private HttpServlet createServlet(Catalog catalog) {
// Iceberg REST client uses "catalog" by default
List<String> scopes = Collections.singletonList("catalog");
ServletSecurity security = new ServletSecurity(AuthType.fromString(authType), configuration, req -> scopes);
return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalog)));
String catalogName = MetastoreConf.getVar(configuration, ConfVars.CATALOG_DEFAULT);
List<IcebergMetricsReporter> reporters = createReporters();
return security.proxy(new HMSCatalogServlet(new HMSCatalogAdapter(catalogName, catalog, reporters)));
}

private List<IcebergMetricsReporter> createReporters() {
final var classes = MetastoreConf.getClasses(configuration, ConfVars.ICEBERG_CATALOG_METRICS_REPORTERS);
return Arrays.stream(classes).map(clazz -> {
try {
final var constructor = clazz.getDeclaredConstructor(Configuration.class);
return (IcebergMetricsReporter) constructor.newInstance(configuration);
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new IllegalArgumentException("Failed to instantiate IcebergMetricsReporter: " + clazz.getName(), e);
}
}).toList();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ private Consumer<ErrorResponse> handle(HttpServletResponse response) {
};
}

@Override
public void destroy() {
super.destroy();
restCatalogAdapter.close();
}

public static class ServletRequestContext {
private HTTPMethod method;
private String path;
Expand Down Expand Up @@ -147,8 +153,15 @@ static ServletRequestContext from(HttpServletRequest request) throws IOException
Route route = routeContext.first();
Object requestBody = null;
if (route.requestClass() != null) {
requestBody =
RESTObjectMapper.mapper().readValue(request.getReader(), route.requestClass());
try {
requestBody = RESTObjectMapper.mapper().readValue(request.getReader(), route.requestClass());
} catch (Exception e) {
return new ServletRequestContext(ErrorResponse.builder()
.responseCode(400)
.withType(e.getClass().getSimpleName())
.withMessage(e.getMessage())
.build());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I happened to find an invalid body throws a 500 error

}
}

Map<String, String> queryParams =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.rest.metrics;

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.metrics.MetricsReport;

import java.io.Closeable;
import java.time.Instant;

/**
* An event reporter for Iceberg metrics.
*/
public interface IcebergMetricsReporter extends Closeable {
/**
* Report an event.
*
* @param catalog the catalog name
* @param identifier the table identifier
* @param report the event
* @param receivedAt the timestamp when the Iceberg REST Catalog received the event
*/
void report(String catalog, TableIdentifier identifier, MetricsReport report, Instant receivedAt);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.rest.metrics;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.metrics.MetricsReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;

/**
* A metrics reporter that logs events.
*/
public class LoggingMetricsReporter implements IcebergMetricsReporter {
private static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsReporter.class);

public LoggingMetricsReporter(Configuration conf) {
}

@Override
public void report(String catalog, TableIdentifier identifier, MetricsReport report, Instant receivedAt) {
LOG.info("Event reported at {}: catalog={}, table={}, report={}", receivedAt, catalog, identifier, report);
}

@Override
public void close() {
LOG.info("Closing {}", getClass().getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.rest.extension;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -60,6 +61,9 @@ public void start(Configuration conf) throws Exception {

String uniqueTestKey = String.format("RESTCatalogServer_%s", UUID.randomUUID());
warehouseDir = Path.of(MetaStoreTestUtils.getTestWarehouseDir(uniqueTestKey));
Files.createDirectories(warehouseDir);
System.setProperty("derby.stream.error.file",
warehouseDir.resolve("derby.log").toAbsolutePath().toString());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log was generated in standalone-metastore/metastore-rest-catalog/derby.log originally, which is not git-ignored.

String jdbcUrl = String.format("jdbc:derby:memory:%s;create=true",
warehouseDir.resolve("metastore_db").toAbsolutePath());
MetastoreConf.setVar(conf, ConfVars.CONNECT_URL_KEY, jdbcUrl);
Expand Down
Loading