Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.local.LocalFileSystemFactory;
import org.apache.flink.core.plugin.MetricsAware;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.util.WrappingProxy;
Expand Down Expand Up @@ -316,6 +318,53 @@ public static List<FileSystemFactory> getRegisteredFileSystemFactories() {
}
}

/**
* Hands a runtime-owned, process-level {@link MetricGroup} to every registered {@link
* FileSystemFactory} that opts into metrics via {@link MetricsAware}.
*
* <p>This is the second phase of file system initialization. {@link #initialize(Configuration,
* PluginManager)} runs at process startup, before the {@code MetricRegistry} exists; this
* method is therefore invoked separately, once the registry and a process-level {@link
* MetricGroup} are available. It is called from the TaskManager and JobManager entrypoints
* only. Contexts without a process-level {@link MetricGroup} (CLI, HistoryServer, YARN client)
* simply never call it, and their file system plugins continue to operate without emitting
* metrics.
*
* <p>The call is idempotent: factories receive a child group {@code <process>.filesystem}, and
* {@link MetricGroup#addGroup} returns the same child on repeated calls with the same parent,
* so re-invocation does not register duplicate metrics. Factories that do not implement {@link
* MetricsAware} are skipped.
*
* @param processMetricGroup the process-level metric group to register file system metrics
* under.
*/
@Internal
public static void attachMetrics(MetricGroup processMetricGroup) {
checkNotNull(processMetricGroup, "processMetricGroup");
LOCK.lock();
try {
final MetricGroup fsGroup = processMetricGroup.addGroup("filesystem");
for (FileSystemFactory factory : FS_FACTORIES.values()) {
// Plugin-loaded factories are wrapped in a PluginFileSystemFactory, which is itself
// MetricsAware and forwards setMetricGroup to the inner factory under the plugin
// classloader, so this plain instanceof reaches both wrapped and direct factories.
if (factory instanceof MetricsAware) {
try {
((MetricsAware) factory).setMetricGroup(fsGroup);
} catch (Throwable t) {
// A misbehaving plugin must never break process startup.
LOG.warn(
"Failed to attach metrics to file system factory {}",
factory.getClass().getName(),
t);
}
}
}
} finally {
LOCK.unlock();
}
}

/**
* Initializes the shared file system settings.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.flink.core.fs;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.plugin.MetricsAware;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.util.WrappingProxy;

Expand All @@ -30,7 +32,7 @@
* {@link FileSystem} operations.
*/
public class PluginFileSystemFactory
implements FileSystemFactory, WrappingProxy<FileSystemFactory> {
implements FileSystemFactory, WrappingProxy<FileSystemFactory>, MetricsAware {
private final FileSystemFactory inner;
private final ClassLoader loader;

Expand Down Expand Up @@ -58,6 +60,20 @@ public void configure(final Configuration config) {
inner.configure(config);
}

/**
* Forwards the metric group to the wrapped factory if it opts into metrics, using the plugin
* classloader so the factory observes the same isolation as every other call routed through
* this wrapper.
*/
@Override
public void setMetricGroup(final MetricGroup metricGroup) {
if (inner instanceof MetricsAware) {
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
((MetricsAware) inner).setMetricGroup(metricGroup);
}
}
}

@Override
public FileSystem create(final URI fsUri) throws IOException {
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.plugin;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.metrics.MetricGroup;

/**
* Capability marker for {@link Plugin}s that want to register Flink metrics.
*
* <p>This is an opt-in extension to the plugin SPI. A plugin declares {@code implements
* SomePluginSpi, MetricsAware}; plugins that do not implement it are byte-for-byte unchanged and
* emit no metrics.
*
* <p><b>Two-phase init contract.</b> The runtime invokes {@link #setMetricGroup(MetricGroup)} after
* {@link Plugin#configure(org.apache.flink.configuration.Configuration)} and before any operation
* that would emit a metric. The call happens only from runtime entrypoints that own a process-level
* {@link MetricGroup} (TaskManager and JobManager), via {@link
* org.apache.flink.core.fs.FileSystem#attachMetrics(MetricGroup)}. Contexts without such a group
* (CLI, HistoryServer, YARN client, embedded usage) never call it, in which case the plugin must
* continue to operate normally and emit no metrics.
*
* <p><b>Idempotency.</b> {@code setMetricGroup} may be invoked more than once (for example if
* metrics are re-attached). Implementations must treat repeated invocations idempotently: a call
* with the same {@link MetricGroup} must not register duplicate metrics, and a call with a
* different group should re-scope subsequently created metrics to the new group.
*
* <p>The {@link MetricGroup} is owned by the runtime; plugins must not close it. They may call
* {@link MetricGroup#addGroup} on it freely to build nested label scopes.
*/
@PublicEvolving
public interface MetricsAware {

/**
* Hands the plugin a runtime-owned {@link MetricGroup} to register its metrics against. See the
* class-level two-phase init contract.
*
* @param metricGroup the group to register metrics under; never {@code null}.
*/
void setMetricGroup(MetricGroup metricGroup);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.fs;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.plugin.MetricsAware;
import org.apache.flink.core.plugin.TestingPluginManager;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

/**
* Tests for {@link FileSystem#attachMetrics(MetricGroup)} and the {@link MetricsAware} two-phase
* init contract.
*
* <p>The headline case is {@link #attachMetricsReachesPluginLoadedMetricsAwareFactory()}: plugin
* file systems are registered wrapped in a {@link PluginFileSystemFactory}, which does <em>not</em>
* itself implement {@link MetricsAware}. {@code attachMetrics} must unwrap the proxy to reach the
* real factory, otherwise the metric group is silently never delivered and no metrics are ever
* emitted.
*/
class FileSystemAttachMetricsTest {

@AfterEach
void resetFileSystems() {
// Restore the default, plugin-less factory registry so other tests are unaffected.
FileSystem.initialize(new Configuration(), null);
}

@Test
void attachMetricsReachesPluginLoadedMetricsAwareFactory() {
RecordingMetricsAwareFactory factory = new RecordingMetricsAwareFactory("metrics-test-fs");
initializeWithPlugins(factory);

RecordingMetricGroup processGroup = new RecordingMetricGroup();
FileSystem.attachMetrics(processGroup);

// Unwrapped through PluginFileSystemFactory and invoked exactly once.
assertThat(factory.setMetricGroupCalls).hasValue(1);
// The group handed to the factory is the "filesystem" child of the process group, not the
// process group itself.
assertThat(processGroup.childGroupNames).containsExactly("filesystem");
assertThat(factory.receivedGroup.get()).isNotNull().isNotSameAs(processGroup);
}

@Test
void attachMetricsSkipsFactoriesThatAreNotMetricsAware() {
initializeWithPlugins(new PlainFactory("plain-test-fs"));

assertThatCode(() -> FileSystem.attachMetrics(new UnregisteredMetricsGroup()))
.doesNotThrowAnyException();
}

@Test
void attachMetricsIsResilientToAFactoryThatThrows() {
RecordingMetricsAwareFactory ok = new RecordingMetricsAwareFactory("ok-test-fs");
initializeWithPlugins(new ThrowingMetricsAwareFactory("throwing-test-fs"), ok);

// A misbehaving plugin must never break process startup, and well-behaved factories must
// still receive their group regardless of iteration order.
assertThatCode(() -> FileSystem.attachMetrics(new UnregisteredMetricsGroup()))
.doesNotThrowAnyException();
assertThat(ok.setMetricGroupCalls).hasValue(1);
}

@Test
void attachMetricsDoesNotThrowWhenInvokedRepeatedly() {
RecordingMetricsAwareFactory factory = new RecordingMetricsAwareFactory("idem-test-fs");
initializeWithPlugins(factory);

MetricGroup group = new UnregisteredMetricsGroup();
assertThatCode(
() -> {
FileSystem.attachMetrics(group);
FileSystem.attachMetrics(group);
})
.doesNotThrowAnyException();
}

@Test
void setMetricGroupIsNotInvokedWhenAttachMetricsIsNeverCalled() {
RecordingMetricsAwareFactory factory = new RecordingMetricsAwareFactory("never-test-fs");
initializeWithPlugins(factory);

assertThat(factory.setMetricGroupCalls).hasValue(0);
}

@Test
void pluginFileSystemFactoryForwardsMetricGroupToInner() {
RecordingMetricsAwareFactory inner = new RecordingMetricsAwareFactory("wrapped-fs");
FileSystemFactory wrapper = PluginFileSystemFactory.of(inner);

// The wrapper must itself be MetricsAware so attachMetrics reaches it without unwrapping.
assertThat(wrapper).isInstanceOf(MetricsAware.class);

MetricGroup group = new UnregisteredMetricsGroup();
((MetricsAware) wrapper).setMetricGroup(group);

assertThat(inner.setMetricGroupCalls).hasValue(1);
assertThat(inner.receivedGroup.get()).isSameAs(group);
}

private static void initializeWithPlugins(FileSystemFactory... factories) {
Map<Class<?>, Iterator<?>> plugins = new HashMap<>();
plugins.put(FileSystemFactory.class, Arrays.asList(factories).iterator());
FileSystem.initialize(new Configuration(), new TestingPluginManager(plugins));
}

// ------------------------------------------------------------------------
// test factories
// ------------------------------------------------------------------------

private static class PlainFactory implements FileSystemFactory {
private final String scheme;

PlainFactory(String scheme) {
this.scheme = scheme;
}

@Override
public void configure(Configuration config) {}

@Override
public String getScheme() {
return scheme;
}

@Override
public FileSystem create(URI fsUri) throws IOException {
throw new UnsupportedOperationException("not needed for this test");
}
}

private static class RecordingMetricsAwareFactory extends PlainFactory implements MetricsAware {
final AtomicInteger setMetricGroupCalls = new AtomicInteger();
final AtomicReference<MetricGroup> receivedGroup = new AtomicReference<>();

RecordingMetricsAwareFactory(String scheme) {
super(scheme);
}

@Override
public void setMetricGroup(MetricGroup metricGroup) {
setMetricGroupCalls.incrementAndGet();
receivedGroup.set(metricGroup);
}
}

private static class ThrowingMetricsAwareFactory extends PlainFactory implements MetricsAware {
ThrowingMetricsAwareFactory(String scheme) {
super(scheme);
}

@Override
public void setMetricGroup(MetricGroup metricGroup) {
throw new RuntimeException("intentional failure from a misbehaving plugin");
}
}

/**
* Records the names of child groups created directly under it, sharing the list with children.
*/
private static class RecordingMetricGroup extends UnregisteredMetricsGroup {
final List<String> childGroupNames;

RecordingMetricGroup() {
this(Collections.synchronizedList(new ArrayList<>()));
}

private RecordingMetricGroup(List<String> childGroupNames) {
this.childGroupNames = childGroupNames;
}

@Override
public MetricGroup addGroup(String name) {
childGroupNames.add(name);
return new RecordingMetricGroup(childGroupNames);
}
}
}
Loading