diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 3699b2a2962d0..d9c347bc89d6e 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -32,7 +32,9 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.core.fs.local.LocalFileSystemFactory; +import org.apache.flink.core.plugin.MetricsAware; import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TemporaryClassLoaderContext; import org.apache.flink.util.WrappingProxy; @@ -316,6 +318,53 @@ public static List getRegisteredFileSystemFactories() { } } + /** + * Hands a runtime-owned, process-level {@link MetricGroup} to every registered {@link + * FileSystemFactory} that opts into metrics via {@link MetricsAware}. + * + *

This is the second phase of file system initialization. {@link #initialize(Configuration, + * PluginManager)} runs at process startup, before the {@code MetricRegistry} exists; this + * method is therefore invoked separately, once the registry and a process-level {@link + * MetricGroup} are available. It is called from the TaskManager and JobManager entrypoints + * only. Contexts without a process-level {@link MetricGroup} (CLI, HistoryServer, YARN client) + * simply never call it, and their file system plugins continue to operate without emitting + * metrics. + * + *

The call is idempotent: factories receive a child group {@code .filesystem}, and + * {@link MetricGroup#addGroup} returns the same child on repeated calls with the same parent, + * so re-invocation does not register duplicate metrics. Factories that do not implement {@link + * MetricsAware} are skipped. + * + * @param processMetricGroup the process-level metric group to register file system metrics + * under. + */ + @Internal + public static void attachMetrics(MetricGroup processMetricGroup) { + checkNotNull(processMetricGroup, "processMetricGroup"); + LOCK.lock(); + try { + final MetricGroup fsGroup = processMetricGroup.addGroup("filesystem"); + for (FileSystemFactory factory : FS_FACTORIES.values()) { + // Plugin-loaded factories are wrapped in a PluginFileSystemFactory, which is itself + // MetricsAware and forwards setMetricGroup to the inner factory under the plugin + // classloader, so this plain instanceof reaches both wrapped and direct factories. + if (factory instanceof MetricsAware) { + try { + ((MetricsAware) factory).setMetricGroup(fsGroup); + } catch (Throwable t) { + // A misbehaving plugin must never break process startup. + LOG.warn( + "Failed to attach metrics to file system factory {}", + factory.getClass().getName(), + t); + } + } + } + } finally { + LOCK.unlock(); + } + } + /** * Initializes the shared file system settings. * diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java index e7eb5892363f4..c2ff0d5e89352 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java @@ -18,6 +18,8 @@ package org.apache.flink.core.fs; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.plugin.MetricsAware; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.TemporaryClassLoaderContext; import org.apache.flink.util.WrappingProxy; @@ -30,7 +32,7 @@ * {@link FileSystem} operations. */ public class PluginFileSystemFactory - implements FileSystemFactory, WrappingProxy { + implements FileSystemFactory, WrappingProxy, MetricsAware { private final FileSystemFactory inner; private final ClassLoader loader; @@ -58,6 +60,20 @@ public void configure(final Configuration config) { inner.configure(config); } + /** + * Forwards the metric group to the wrapped factory if it opts into metrics, using the plugin + * classloader so the factory observes the same isolation as every other call routed through + * this wrapper. + */ + @Override + public void setMetricGroup(final MetricGroup metricGroup) { + if (inner instanceof MetricsAware) { + try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { + ((MetricsAware) inner).setMetricGroup(metricGroup); + } + } + } + @Override public FileSystem create(final URI fsUri) throws IOException { try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(loader)) { diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/MetricsAware.java b/flink-core/src/main/java/org/apache/flink/core/plugin/MetricsAware.java new file mode 100644 index 0000000000000..9187272c5a2f2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/MetricsAware.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.metrics.MetricGroup; + +/** + * Capability marker for {@link Plugin}s that want to register Flink metrics. + * + *

This is an opt-in extension to the plugin SPI. A plugin declares {@code implements + * SomePluginSpi, MetricsAware}; plugins that do not implement it are byte-for-byte unchanged and + * emit no metrics. + * + *

Two-phase init contract. The runtime invokes {@link #setMetricGroup(MetricGroup)} after + * {@link Plugin#configure(org.apache.flink.configuration.Configuration)} and before any operation + * that would emit a metric. The call happens only from runtime entrypoints that own a process-level + * {@link MetricGroup} (TaskManager and JobManager), via {@link + * org.apache.flink.core.fs.FileSystem#attachMetrics(MetricGroup)}. Contexts without such a group + * (CLI, HistoryServer, YARN client, embedded usage) never call it, in which case the plugin must + * continue to operate normally and emit no metrics. + * + *

Idempotency. {@code setMetricGroup} may be invoked more than once (for example if + * metrics are re-attached). Implementations must treat repeated invocations idempotently: a call + * with the same {@link MetricGroup} must not register duplicate metrics, and a call with a + * different group should re-scope subsequently created metrics to the new group. + * + *

The {@link MetricGroup} is owned by the runtime; plugins must not close it. They may call + * {@link MetricGroup#addGroup} on it freely to build nested label scopes. + */ +@PublicEvolving +public interface MetricsAware { + + /** + * Hands the plugin a runtime-owned {@link MetricGroup} to register its metrics against. See the + * class-level two-phase init contract. + * + * @param metricGroup the group to register metrics under; never {@code null}. + */ + void setMetricGroup(MetricGroup metricGroup); +} diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java new file mode 100644 index 0000000000000..907004ccbb737 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/fs/FileSystemAttachMetricsTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.plugin.MetricsAware; +import org.apache.flink.core.plugin.TestingPluginManager; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +/** + * Tests for {@link FileSystem#attachMetrics(MetricGroup)} and the {@link MetricsAware} two-phase + * init contract. + * + *

The headline case is {@link #attachMetricsReachesPluginLoadedMetricsAwareFactory()}: plugin + * file systems are registered wrapped in a {@link PluginFileSystemFactory}, which does not + * itself implement {@link MetricsAware}. {@code attachMetrics} must unwrap the proxy to reach the + * real factory, otherwise the metric group is silently never delivered and no metrics are ever + * emitted. + */ +class FileSystemAttachMetricsTest { + + @AfterEach + void resetFileSystems() { + // Restore the default, plugin-less factory registry so other tests are unaffected. + FileSystem.initialize(new Configuration(), null); + } + + @Test + void attachMetricsReachesPluginLoadedMetricsAwareFactory() { + RecordingMetricsAwareFactory factory = new RecordingMetricsAwareFactory("metrics-test-fs"); + initializeWithPlugins(factory); + + RecordingMetricGroup processGroup = new RecordingMetricGroup(); + FileSystem.attachMetrics(processGroup); + + // Unwrapped through PluginFileSystemFactory and invoked exactly once. + assertThat(factory.setMetricGroupCalls).hasValue(1); + // The group handed to the factory is the "filesystem" child of the process group, not the + // process group itself. + assertThat(processGroup.childGroupNames).containsExactly("filesystem"); + assertThat(factory.receivedGroup.get()).isNotNull().isNotSameAs(processGroup); + } + + @Test + void attachMetricsSkipsFactoriesThatAreNotMetricsAware() { + initializeWithPlugins(new PlainFactory("plain-test-fs")); + + assertThatCode(() -> FileSystem.attachMetrics(new UnregisteredMetricsGroup())) + .doesNotThrowAnyException(); + } + + @Test + void attachMetricsIsResilientToAFactoryThatThrows() { + RecordingMetricsAwareFactory ok = new RecordingMetricsAwareFactory("ok-test-fs"); + initializeWithPlugins(new ThrowingMetricsAwareFactory("throwing-test-fs"), ok); + + // A misbehaving plugin must never break process startup, and well-behaved factories must + // still receive their group regardless of iteration order. + assertThatCode(() -> FileSystem.attachMetrics(new UnregisteredMetricsGroup())) + .doesNotThrowAnyException(); + assertThat(ok.setMetricGroupCalls).hasValue(1); + } + + @Test + void attachMetricsDoesNotThrowWhenInvokedRepeatedly() { + RecordingMetricsAwareFactory factory = new RecordingMetricsAwareFactory("idem-test-fs"); + initializeWithPlugins(factory); + + MetricGroup group = new UnregisteredMetricsGroup(); + assertThatCode( + () -> { + FileSystem.attachMetrics(group); + FileSystem.attachMetrics(group); + }) + .doesNotThrowAnyException(); + } + + @Test + void setMetricGroupIsNotInvokedWhenAttachMetricsIsNeverCalled() { + RecordingMetricsAwareFactory factory = new RecordingMetricsAwareFactory("never-test-fs"); + initializeWithPlugins(factory); + + assertThat(factory.setMetricGroupCalls).hasValue(0); + } + + @Test + void pluginFileSystemFactoryForwardsMetricGroupToInner() { + RecordingMetricsAwareFactory inner = new RecordingMetricsAwareFactory("wrapped-fs"); + FileSystemFactory wrapper = PluginFileSystemFactory.of(inner); + + // The wrapper must itself be MetricsAware so attachMetrics reaches it without unwrapping. + assertThat(wrapper).isInstanceOf(MetricsAware.class); + + MetricGroup group = new UnregisteredMetricsGroup(); + ((MetricsAware) wrapper).setMetricGroup(group); + + assertThat(inner.setMetricGroupCalls).hasValue(1); + assertThat(inner.receivedGroup.get()).isSameAs(group); + } + + private static void initializeWithPlugins(FileSystemFactory... factories) { + Map, Iterator> plugins = new HashMap<>(); + plugins.put(FileSystemFactory.class, Arrays.asList(factories).iterator()); + FileSystem.initialize(new Configuration(), new TestingPluginManager(plugins)); + } + + // ------------------------------------------------------------------------ + // test factories + // ------------------------------------------------------------------------ + + private static class PlainFactory implements FileSystemFactory { + private final String scheme; + + PlainFactory(String scheme) { + this.scheme = scheme; + } + + @Override + public void configure(Configuration config) {} + + @Override + public String getScheme() { + return scheme; + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + throw new UnsupportedOperationException("not needed for this test"); + } + } + + private static class RecordingMetricsAwareFactory extends PlainFactory implements MetricsAware { + final AtomicInteger setMetricGroupCalls = new AtomicInteger(); + final AtomicReference receivedGroup = new AtomicReference<>(); + + RecordingMetricsAwareFactory(String scheme) { + super(scheme); + } + + @Override + public void setMetricGroup(MetricGroup metricGroup) { + setMetricGroupCalls.incrementAndGet(); + receivedGroup.set(metricGroup); + } + } + + private static class ThrowingMetricsAwareFactory extends PlainFactory implements MetricsAware { + ThrowingMetricsAwareFactory(String scheme) { + super(scheme); + } + + @Override + public void setMetricGroup(MetricGroup metricGroup) { + throw new RuntimeException("intentional failure from a misbehaving plugin"); + } + } + + /** + * Records the names of child groups created directly under it, sharing the list with children. + */ + private static class RecordingMetricGroup extends UnregisteredMetricsGroup { + final List childGroupNames; + + RecordingMetricGroup() { + this(Collections.synchronizedList(new ArrayList<>())); + } + + private RecordingMetricGroup(List childGroupNames) { + this.childGroupNames = childGroupNames; + } + + @Override + public MetricGroup addGroup(String name) { + childGroupNames.add(name); + return new RecordingMetricGroup(childGroupNames); + } + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java index 2d7fea8532e24..8fa9e605b865d 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java @@ -26,6 +26,9 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.core.plugin.MetricsAware; +import org.apache.flink.fs.s3native.metrics.AwsSdkMetricBridge; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -38,6 +41,7 @@ import java.net.URI; import java.time.Duration; import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -54,7 +58,7 @@ * @see org.apache.flink.core.fs.FileSystemFactory */ @Experimental -public class NativeS3FileSystemFactory implements FileSystemFactory { +public class NativeS3FileSystemFactory implements FileSystemFactory, MetricsAware { private static final Logger LOG = LoggerFactory.getLogger(NativeS3FileSystemFactory.class); @@ -321,9 +325,53 @@ public class NativeS3FileSystemFactory implements FileSystemFactory { + "When not set, the default chain is used: delegation tokens -> " + "static credentials (if configured) -> DefaultCredentialsProvider."); + public static final ConfigOption METRICS_ENABLED = + ConfigOptions.key("s3.metrics.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Master switch for publishing S3 operation metrics to Flink's metric " + + "system. When false, no metric publisher is attached to the SDK " + + "and no metric is registered. Metrics are only emitted under the " + + "TaskManager and JobManager entrypoints, which provide a " + + "process-level metric group; other contexts (CLI, etc.) emit " + + "none regardless of this setting."); + + public static final ConfigOption> METRICS_ALLOWLIST = + ConfigOptions.key("s3.metrics.allowlist") + .stringType() + .asList() + .defaultValues( + "api_call_count", + "api_call_duration_ms", + "throttle_count", + "retry_count", + "iops") + .withDescription( + "Names of S3 metrics to register. Replaces (does not merge with) the " + + "default list. Use \"*\" to register every metric the plugin " + + "emits. An empty list with s3.metrics.enabled=true is treated " + + "as misconfiguration: a warning is logged and the defaults are " + + "used. ('iops' is derived at reporter time from api_call_count " + + "and is not a separately registered metric.)"); + + public static final ConfigOption METRICS_HISTOGRAM_WINDOW_SIZE = + ConfigOptions.key("s3.metrics.histogram.window-size") + .intType() + .defaultValue(1024) + .withDescription( + "Reservoir size for S3 latency histograms (api_call_duration_ms). " + + "Bounds memory regardless of request volume. Must be positive."); + @Nullable private Configuration flinkConfig; @Nullable private BucketConfigProvider bucketConfigProvider; + /** Set via {@link #setMetricGroup(MetricGroup)}; null until {@code attachMetrics} fires. */ + @Nullable private volatile MetricGroup pluginMetrics; + + /** Lazily built once and shared across all clients created by this factory instance. */ + @Nullable private volatile AwsSdkMetricBridge metricBridge; + @Override public String getScheme() { return "s3"; @@ -341,6 +389,46 @@ public void configure(Configuration config) { this.bucketConfigProvider = new BucketConfigProvider(config); } + @Override + public synchronized void setMetricGroup(MetricGroup metricGroup) { + // filesystem_type label value is the scheme ("s3" / "s3a"). This is deliberate: s3:// and + // s3a:// are served by separate factory instances, so keeping the scheme as the label value + // lets their traffic be told apart, and sibling FS plugins register the same label key with + // their own scheme. May be called more than once (see MetricsAware); reset the cached + // bridge + // so a re-attach with a different group re-scopes metrics created afterwards. + this.pluginMetrics = metricGroup.addGroup("filesystem_type", getScheme()); + this.metricBridge = null; + } + + /** + * Returns the SDK metric publisher to attach to clients, or {@code null} when metrics are + * disabled or no metric group has been attached yet (e.g. CLI / embedded usage). The bridge is + * built lazily and cached so all clients of this factory share one set of metric handles. + */ + @Nullable + private AwsSdkMetricBridge resolveMetricBridge(Configuration config) { + final MetricGroup metrics = this.pluginMetrics; + if (metrics == null || !config.get(METRICS_ENABLED)) { + return null; + } + AwsSdkMetricBridge bridge = this.metricBridge; + if (bridge == null) { + synchronized (this) { + bridge = this.metricBridge; + if (bridge == null) { + bridge = + new AwsSdkMetricBridge( + metrics, + config.get(METRICS_ALLOWLIST), + config.get(METRICS_HISTOGRAM_WINDOW_SIZE)); + this.metricBridge = bridge; + } + } + } + return bridge; + } + @Override public FileSystem create(URI fsUri) throws IOException { Configuration config = this.flinkConfig; @@ -486,6 +574,7 @@ public FileSystem create(URI fsUri) throws IOException { .retryMaxBackoff(config.get(RETRY_MAX_BACKOFF)) .credentialsProviderClasses(credentialsProviderClasses) .encryptionConfig(encryptionConfig) + .metricPublisher(resolveMetricBridge(config)) .build(); NativeS3BulkCopyHelper bulkCopyHelper = null; diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java index e3c7a1f238054..46ed69a295ad6 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java @@ -35,6 +35,7 @@ import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.metrics.MetricPublisher; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; import software.amazon.awssdk.retries.StandardRetryStrategy; @@ -365,6 +366,10 @@ public static class Builder { // Custom credentials provider class names (comma-separated) @Nullable private String credentialsProviderClasses; + // Optional AWS SDK metric publisher (e.g. the Flink metric bridge). Attached to both the + // sync and async clients via the shared ClientOverrideConfiguration. Null = no metrics. + @Nullable private MetricPublisher metricPublisher; + public Builder accessKey(@Nullable String accessKey) { this.accessKey = accessKey; return this; @@ -498,6 +503,11 @@ public Builder credentialsProviderClasses(@Nullable String credentialsProviderCl return this; } + public Builder metricPublisher(@Nullable MetricPublisher metricPublisher) { + this.metricPublisher = metricPublisher; + return this; + } + S3ClientProvider build() { if (endpoint == null) { endpoint = System.getProperty("s3.endpoint"); @@ -554,6 +564,13 @@ S3ClientProvider build() { .build()) .build(); + if (metricPublisher != null) { + // Re-wrap the immutable override config with the publisher attached. The same + // publisher feeds both the sync and async clients below. + overrideConfig = + overrideConfig.toBuilder().addMetricPublisher(metricPublisher).build(); + } + ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder() .maxConnections(maxConnections) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java new file mode 100644 index 0000000000000..9ae1ffb84f61d --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridge.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native.metrics; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.ThreadSafeSimpleCounter; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.metrics.CoreMetric; +import software.amazon.awssdk.http.HttpMetric; +import software.amazon.awssdk.metrics.MetricCollection; +import software.amazon.awssdk.metrics.MetricPublisher; +import software.amazon.awssdk.metrics.SdkMetric; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Bridges AWS SDK v2's {@link MetricPublisher} into Flink metrics for {@code flink-s3-fs-native}. + * + *

The SDK invokes {@link #publish(MetricCollection)} asynchronously after every completed API + * call, on its internal completion executor. The bridge reads a small, fixed set of fields and + * emits the default metric surface of FLIP-576 against the {@code filesystem_type}-labelled scope + * it is handed at construction: + * + *

+ * + *

({@code iops} from the default allowlist is derived at reporter time as the rate of {@code + * api_call_count}, so it is not a separately registered metric.) + * + *

Allowlist. Only metrics whose name is in the allowlist passed at construction are + * registered; the rest are skipped on the hot path. A {@code "*"} entry registers everything. The + * default set is the five FLIP-576 metrics ({@code api_call_count}, {@code api_call_duration_ms}, + * {@code throttle_count}, {@code retry_count}, {@code iops}). + * + *

Cardinality. {@code op} comes from the SDK operation name (a closed set of ~15 values + * for S3), {@code status_class} is a closed classifier ({@code 2xx}, {@code 4xx}, {@code 5xx}, + * {@code throttled}, {@code other}, {@code error}, {@code unknown}), and {@code reason} is a closed + * enum ({@code throttled}, {@code 5xx}, {@code other}). Metric handles are cached in bounded maps, + * so {@link #publish} is a map lookup plus a counter increment with no per-record allocation. + * + *

Thread-safety. Counters use {@link ThreadSafeSimpleCounter} and the histogram is + * synchronized, so concurrent publishes from the SDK completion executor are safe. The bridge is + * also safe to share across multiple S3 clients of the same plugin instance. + */ +@Internal +public final class AwsSdkMetricBridge implements MetricPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(AwsSdkMetricBridge.class); + + static final String API_CALL_COUNT = "api_call_count"; + static final String API_CALL_DURATION_MS = "api_call_duration_ms"; + static final String THROTTLE_COUNT = "throttle_count"; + static final String RETRY_COUNT = "retry_count"; + static final String IOPS = "iops"; + + /** The default-on metric set from FLIP-576. {@code iops} is derived, not registered. */ + static final List DEFAULT_ALLOWLIST = + Arrays.asList(API_CALL_COUNT, API_CALL_DURATION_MS, THROTTLE_COUNT, RETRY_COUNT, IOPS); + + private static final String WILDCARD = "*"; + + private static final String LABEL_OP = "op"; + private static final String LABEL_STATUS_CLASS = "status_class"; + private static final String LABEL_REASON = "reason"; + + private static final String UNKNOWN_OP = "Unknown"; + + private final MetricGroup fsScope; + private final int histogramWindowSize; + + private final boolean allowAll; + private final Set allowlist; + + // op and label sets are closed, so these maps are bounded by construction. + private final ConcurrentHashMap counters = new ConcurrentHashMap<>(); + private final ConcurrentHashMap histograms = + new ConcurrentHashMap<>(); + + public AwsSdkMetricBridge(MetricGroup fsScope) { + this(fsScope, DEFAULT_ALLOWLIST, S3MetricHistogram.DEFAULT_WINDOW_SIZE); + } + + public AwsSdkMetricBridge(MetricGroup fsScope, int histogramWindowSize) { + this(fsScope, DEFAULT_ALLOWLIST, histogramWindowSize); + } + + public AwsSdkMetricBridge( + MetricGroup fsScope, @Nullable Collection allowlist, int histogramWindowSize) { + this.fsScope = Preconditions.checkNotNull(fsScope, "fsScope must not be null"); + Preconditions.checkArgument( + histogramWindowSize > 0, "histogramWindowSize must be positive"); + this.histogramWindowSize = histogramWindowSize; + + if (allowlist == null || allowlist.isEmpty()) { + LOG.warn( + "S3 metrics allowlist is empty; falling back to the default metric set {}", + DEFAULT_ALLOWLIST); + this.allowAll = false; + this.allowlist = new HashSet<>(DEFAULT_ALLOWLIST); + } else if (allowlist.contains(WILDCARD)) { + this.allowAll = true; + this.allowlist = new HashSet<>(); + } else { + this.allowAll = false; + this.allowlist = new HashSet<>(allowlist); + } + } + + private boolean allowed(String metricName) { + return allowAll || allowlist.contains(metricName); + } + + @Override + public void publish(MetricCollection apiCall) { + try { + translate(apiCall); + } catch (Throwable t) { + // Defence in depth: a metric failure must never affect S3 IO. + LOG.debug("Failed to publish S3 SDK metrics", t); + } + } + + private void translate(MetricCollection apiCall) { + final String op = first(apiCall, CoreMetric.OPERATION_NAME, UNKNOWN_OP); + + final Duration duration = first(apiCall, CoreMetric.API_CALL_DURATION, null); + if (duration != null && allowed(API_CALL_DURATION_MS)) { + histogram(op).update(duration.toMillis()); + } + + // HTTP_STATUS_CODE lives on the per-attempt children, not on the top-level ApiCall record. + // status_class reflects the overall outcome (last attempt); the retry reason reflects the + // failures that triggered the retries (any attempt), so they are tracked separately. + int throttleResponses = 0; + boolean sawServerError = false; + Integer lastStatus = null; + for (MetricCollection attempt : apiCall.children()) { + for (Integer status : attempt.metricValues(HttpMetric.HTTP_STATUS_CODE)) { + if (status != null) { + lastStatus = status; + if (isThrottle(status)) { + throttleResponses++; + } else if (status >= 500) { + sawServerError = true; + } + } + } + } + + final Boolean successful = first(apiCall, CoreMetric.API_CALL_SUCCESSFUL, null); + if (allowed(API_CALL_COUNT)) { + apiCallCount(op, statusClass(lastStatus, successful)).inc(); + } + + if (throttleResponses > 0 && allowed(THROTTLE_COUNT)) { + throttleCount(op).inc(throttleResponses); + } + + final Integer retries = first(apiCall, CoreMetric.RETRY_COUNT, 0); + if (retries != null && retries > 0 && allowed(RETRY_COUNT)) { + retryCount(op, retryReason(throttleResponses > 0, sawServerError)).inc(retries); + } + } + + private static boolean isThrottle(int status) { + return status == 429 || status == 503; + } + + private static String statusClass(Integer status, Boolean successful) { + if (status == null) { + if (Boolean.TRUE.equals(successful)) { + return "2xx"; + } + return Boolean.FALSE.equals(successful) ? "error" : "unknown"; + } + if (isThrottle(status)) { + return "throttled"; + } + if (status >= 200 && status < 300) { + return "2xx"; + } + if (status >= 400 && status < 500) { + return "4xx"; + } + if (status >= 500) { + return "5xx"; + } + return "other"; + } + + private static String retryReason(boolean sawThrottle, boolean sawServerError) { + if (sawThrottle) { + return "throttled"; + } + if (sawServerError) { + return "5xx"; + } + return "other"; + } + + private Counter apiCallCount(String op, String statusClass) { + return counters.computeIfAbsent( + "ac|" + op + "|" + statusClass, + k -> + fsScope.addGroup(LABEL_OP, op) + .addGroup(LABEL_STATUS_CLASS, statusClass) + .counter(API_CALL_COUNT, new ThreadSafeSimpleCounter())); + } + + private Counter throttleCount(String op) { + return counters.computeIfAbsent( + "th|" + op, + k -> + fsScope.addGroup(LABEL_OP, op) + .counter(THROTTLE_COUNT, new ThreadSafeSimpleCounter())); + } + + private Counter retryCount(String op, String reason) { + return counters.computeIfAbsent( + "rc|" + op + "|" + reason, + k -> + fsScope.addGroup(LABEL_OP, op) + .addGroup(LABEL_REASON, reason) + .counter(RETRY_COUNT, new ThreadSafeSimpleCounter())); + } + + private S3MetricHistogram histogram(String op) { + return histograms.computeIfAbsent( + op, + k -> + fsScope.addGroup(LABEL_OP, op) + .histogram( + API_CALL_DURATION_MS, + new S3MetricHistogram(histogramWindowSize))); + } + + private static T first(MetricCollection collection, SdkMetric metric, T defaultValue) { + final List values = collection.metricValues(metric); + return (values == null || values.isEmpty()) ? defaultValue : values.get(0); + } + + @Override + public void close() { + // The MetricGroup is owned by the runtime (see MetricsAware); the bridge holds no resources + // of its own, so close() is a no-op. This also keeps the bridge safe to share across S3 + // clients of the same plugin instance. + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/S3MetricHistogram.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/S3MetricHistogram.java new file mode 100644 index 0000000000000..c2d5ecce0b552 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/metrics/S3MetricHistogram.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native.metrics; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; + +import java.util.Arrays; + +/** + * Minimal sliding-window {@link Histogram} used by {@link AwsSdkMetricBridge} for {@code + * api_call_duration_ms}. + * + *

Backed by a fixed-size circular buffer holding the most recent {@code windowSize} samples + * (default {@value #DEFAULT_WINDOW_SIZE}). This bounds memory regardless of request volume and + * keeps {@link #update(long)} O(1); statistics are computed from a sorted snapshot of the window. + * + *

flink-s3-fs-native deliberately keeps a minimal dependency footprint and does not depend on + * flink-runtime, so {@code DescriptiveStatisticsHistogram} is not available; this is a small + * self-contained equivalent. + */ +@Internal +public class S3MetricHistogram implements Histogram { + + static final int DEFAULT_WINDOW_SIZE = 1024; + + private final long[] window; + private int size; // number of valid samples currently in the window + private int next; // next write position + private long count; // total samples ever recorded + + public S3MetricHistogram() { + this(DEFAULT_WINDOW_SIZE); + } + + public S3MetricHistogram(int windowSize) { + this.window = new long[windowSize]; + } + + @Override + public synchronized void update(long value) { + window[next] = value; + next = (next + 1) % window.length; + if (size < window.length) { + size++; + } + count++; + } + + @Override + public synchronized long getCount() { + return count; + } + + @Override + public synchronized HistogramStatistics getStatistics() { + return new WindowStatistics(Arrays.copyOf(window, size)); + } + + private static final class WindowStatistics extends HistogramStatistics { + + private final long[] sorted; + + WindowStatistics(long[] values) { + Arrays.sort(values); + this.sorted = values; + } + + @Override + public double getQuantile(double quantile) { + if (sorted.length == 0) { + return 0.0; + } + double pos = quantile * (sorted.length + 1); + if (pos < 1) { + return sorted[0]; + } + if (pos >= sorted.length) { + return sorted[sorted.length - 1]; + } + int lower = (int) pos; + double frac = pos - lower; + return sorted[lower - 1] + frac * (sorted[lower] - sorted[lower - 1]); + } + + @Override + public long[] getValues() { + return sorted; + } + + @Override + public int size() { + return sorted.length; + } + + @Override + public double getMean() { + if (sorted.length == 0) { + return 0.0; + } + long sum = 0; + for (long v : sorted) { + sum += v; + } + return (double) sum / sorted.length; + } + + @Override + public double getStdDev() { + if (sorted.length == 0) { + return 0.0; + } + final double mean = getMean(); + double sumSquares = 0.0; + for (long v : sorted) { + final double diff = v - mean; + sumSquares += diff * diff; + } + return Math.sqrt(sumSquares / sorted.length); + } + + @Override + public long getMax() { + return sorted.length == 0 ? 0L : sorted[sorted.length - 1]; + } + + @Override + public long getMin() { + return sorted.length == 0 ? 0L : sorted[0]; + } + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java new file mode 100644 index 0000000000000..9d0695807f93a --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/AwsSdkMetricBridgeTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native.metrics; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.metrics.CoreMetric; +import software.amazon.awssdk.http.HttpMetric; +import software.amazon.awssdk.metrics.MetricCollection; +import software.amazon.awssdk.metrics.MetricCollector; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link AwsSdkMetricBridge}'s translation of SDK metric records into Flink metrics. */ +class AwsSdkMetricBridgeTest { + + @Test + void successfulCallIncrementsApiCallCountAndRecordsDuration() { + CapturingMetricGroup root = new CapturingMetricGroup(); + AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root); + + bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0, 200)); + + assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L); + Histogram histogram = root.histograms.get("op=PutObject/api_call_duration_ms"); + assertThat(histogram).isNotNull(); + assertThat(histogram.getCount()).isEqualTo(1L); + assertThat(histogram.getStatistics().getMax()).isEqualTo(120L); + assertThat(root.counters).doesNotContainKey("op=PutObject/throttle_count"); + } + + @Test + void throttledCallIncrementsThrottleAndRetryCounts() { + CapturingMetricGroup root = new CapturingMetricGroup(); + AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root); + + // Two throttled attempts (503) followed by a successful one (200); RETRY_COUNT = 2. + bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 503, 503, 200)); + + assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L); + assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L); + // The final attempt succeeded, so the overall call is classified 2xx. + assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L); + } + + @Test + void clientErrorIsClassifiedAs4xx() { + CapturingMetricGroup root = new CapturingMetricGroup(); + AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root); + + bridge.publish(apiCall("HeadObject", Duration.ofMillis(20), false, 0, 404)); + + assertThat(root.count("op=HeadObject/status_class=4xx/api_call_count")).isEqualTo(1L); + assertThat(root.counters).doesNotContainKey("op=HeadObject/throttle_count"); + } + + @Test + void allowlistRegistersOnlyTheListedMetrics() { + CapturingMetricGroup root = new CapturingMetricGroup(); + // Only api_call_count is allowed; duration, throttle and retry must be skipped. + AwsSdkMetricBridge bridge = + new AwsSdkMetricBridge( + root, + Collections.singletonList(AwsSdkMetricBridge.API_CALL_COUNT), + S3MetricHistogram.DEFAULT_WINDOW_SIZE); + + bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 503, 503, 200)); + + assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L); + assertThat(root.histograms).doesNotContainKey("op=UploadPart/api_call_duration_ms"); + assertThat(root.counters).doesNotContainKey("op=UploadPart/throttle_count"); + assertThat(root.counters).doesNotContainKey("op=UploadPart/reason=throttled/retry_count"); + } + + @Test + void wildcardAllowlistRegistersEveryMetric() { + CapturingMetricGroup root = new CapturingMetricGroup(); + AwsSdkMetricBridge bridge = + new AwsSdkMetricBridge( + root, + Collections.singletonList("*"), + S3MetricHistogram.DEFAULT_WINDOW_SIZE); + + bridge.publish(apiCall("UploadPart", Duration.ofMillis(900), true, 2, 503, 503, 200)); + + assertThat(root.count("op=UploadPart/status_class=2xx/api_call_count")).isEqualTo(1L); + assertThat(root.histograms.get("op=UploadPart/api_call_duration_ms")).isNotNull(); + assertThat(root.count("op=UploadPart/throttle_count")).isEqualTo(2L); + assertThat(root.count("op=UploadPart/reason=throttled/retry_count")).isEqualTo(2L); + } + + @Test + void emptyAllowlistFallsBackToDefaults() { + CapturingMetricGroup root = new CapturingMetricGroup(); + AwsSdkMetricBridge bridge = + new AwsSdkMetricBridge( + root, Collections.emptyList(), S3MetricHistogram.DEFAULT_WINDOW_SIZE); + + bridge.publish(apiCall("PutObject", Duration.ofMillis(120), true, 0, 200)); + + // The five default metrics include api_call_count and api_call_duration_ms. + assertThat(root.count("op=PutObject/status_class=2xx/api_call_count")).isEqualTo(1L); + assertThat(root.histograms.get("op=PutObject/api_call_duration_ms")).isNotNull(); + } + + @Test + void serverErrorRetryIsClassifiedAs5xx() { + CapturingMetricGroup root = new CapturingMetricGroup(); + AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root); + + bridge.publish(apiCall("GetObject", Duration.ofMillis(50), true, 1, 500, 200)); + + assertThat(root.count("op=GetObject/reason=5xx/retry_count")).isEqualTo(1L); + assertThat(root.counters).doesNotContainKey("op=GetObject/throttle_count"); + } + + @Test + void accumulatesAcrossMultipleCalls() { + CapturingMetricGroup root = new CapturingMetricGroup(); + AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root); + + bridge.publish(apiCall("GetObject", Duration.ofMillis(10), true, 0, 200)); + bridge.publish(apiCall("GetObject", Duration.ofMillis(30), true, 0, 200)); + + assertThat(root.count("op=GetObject/status_class=2xx/api_call_count")).isEqualTo(2L); + Histogram histogram = root.histograms.get("op=GetObject/api_call_duration_ms"); + assertThat(histogram.getCount()).isEqualTo(2L); + assertThat(histogram.getStatistics().getMin()).isEqualTo(10L); + assertThat(histogram.getStatistics().getMax()).isEqualTo(30L); + } + + @Test + void publishOfEmptyCollectionDoesNotThrowAndUsesUnknownOp() { + CapturingMetricGroup root = new CapturingMetricGroup(); + AwsSdkMetricBridge bridge = new AwsSdkMetricBridge(root); + + bridge.publish(MetricCollector.create("ApiCall").collect()); + + assertThat(root.counters.keySet()).anyMatch(key -> key.contains("op=Unknown")); + } + + private static MetricCollection apiCall( + String op, Duration duration, boolean successful, int retries, int... attemptStatuses) { + MetricCollector apiCall = MetricCollector.create("ApiCall"); + apiCall.reportMetric(CoreMetric.OPERATION_NAME, op); + apiCall.reportMetric(CoreMetric.API_CALL_DURATION, duration); + apiCall.reportMetric(CoreMetric.API_CALL_SUCCESSFUL, successful); + apiCall.reportMetric(CoreMetric.RETRY_COUNT, retries); + for (int status : attemptStatuses) { + MetricCollector attempt = apiCall.createChild("ApiCallAttempt"); + attempt.reportMetric(HttpMetric.HTTP_STATUS_CODE, status); + } + return apiCall.collect(); + } + + /** A {@link MetricGroup} that captures registered metrics keyed by their full label path. */ + private static final class CapturingMetricGroup extends UnregisteredMetricsGroup { + + final Map counters; + final Map histograms; + private final String path; + + CapturingMetricGroup() { + this("", new HashMap<>(), new HashMap<>()); + } + + private CapturingMetricGroup( + String path, Map counters, Map histograms) { + this.path = path; + this.counters = counters; + this.histograms = histograms; + } + + long count(String key) { + Counter counter = counters.get(key); + assertThat(counter).as("counter %s", key).isNotNull(); + return counter.getCount(); + } + + @Override + public MetricGroup addGroup(String name) { + return new CapturingMetricGroup(child(name), counters, histograms); + } + + @Override + public MetricGroup addGroup(String key, String value) { + return new CapturingMetricGroup(child(key + "=" + value), counters, histograms); + } + + @Override + public C counter(String name, C counter) { + counters.put(child(name), counter); + return counter; + } + + @Override + public H histogram(String name, H histogram) { + histograms.put(child(name), histogram); + return histogram; + } + + private String child(String segment) { + return path.isEmpty() ? segment : path + "/" + segment; + } + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3FileSystemFactoryMetricsTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3FileSystemFactoryMetricsTest.java new file mode 100644 index 0000000000000..bdb9984550d4d --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3FileSystemFactoryMetricsTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native.metrics; + +import org.apache.flink.fs.s3native.NativeS3AFileSystemFactory; +import org.apache.flink.fs.s3native.NativeS3FileSystemFactory; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that the native S3 factories register metrics under a {@code filesystem_type} label whose + * value is the factory scheme, so {@code s3://} and {@code s3a://} traffic remain distinguishable. + */ +class NativeS3FileSystemFactoryMetricsTest { + + @Test + void s3FactoryUsesSchemeAsFilesystemTypeLabel() { + RecordingGroup group = new RecordingGroup(); + new NativeS3FileSystemFactory().setMetricGroup(group); + assertThat(group.keyedGroups).containsEntry("filesystem_type", "s3"); + } + + @Test + void s3aFactoryUsesSchemeAsFilesystemTypeLabel() { + RecordingGroup group = new RecordingGroup(); + new NativeS3AFileSystemFactory().setMetricGroup(group); + assertThat(group.keyedGroups).containsEntry("filesystem_type", "s3a"); + } + + private static final class RecordingGroup extends UnregisteredMetricsGroup { + final Map keyedGroups = new HashMap<>(); + + @Override + public MetricGroup addGroup(String key, String value) { + keyedGroups.put(key, value); + return this; + } + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3MetricsEmissionITCase.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3MetricsEmissionITCase.java new file mode 100644 index 0000000000000..5e38eac641d3a --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/NativeS3MetricsEmissionITCase.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native.metrics; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.fs.s3native.NativeS3FileSystemFactory; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.testutils.MetricListener; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; + +import java.io.FileNotFoundException; +import java.net.URI; +import java.time.Duration; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * End-to-end test proving that real S3 operations performed through {@code NativeS3FileSystem} are + * translated into Flink metrics by {@link AwsSdkMetricBridge} and become visible in a real Flink + * metric registry. + * + *

Unlike {@link AwsSdkMetricBridgeTest} (which drives the bridge with synthesized SDK records + * and a fake {@code MetricGroup}), this test exercises the full chain: the AWS SDK actually invokes + * the registered {@link software.amazon.awssdk.metrics.MetricPublisher} after each completed API + * call, the bridge registers and updates {@link Counter}/{@link Histogram} handles, and the + * assertions read those handles back through {@link MetricListener}'s real {@code MetricRegistry}. + * + *

Assertions use only GET/HEAD/LIST round trips, which carry no request body and are therefore + * unaffected by the request-checksum behaviour newer AWS SDK versions apply to {@code PutObject}. + * + *

Requires Docker; auto-skipped when Docker is unavailable. + */ +@Testcontainers(disabledWithoutDocker = true) +class NativeS3MetricsEmissionITCase { + + private static final String MINIO_IMAGE = "minio/minio:RELEASE.2022-02-07T08-17-33Z"; + private static final int MINIO_PORT = 9000; + private static final String ACCESS_KEY = "metricsAccessKey"; + private static final String SECRET_KEY = "metricsSecretKey"; + private static final String BUCKET = "flip576-metrics"; + + @Container + private static final GenericContainer MINIO = + new GenericContainer<>(MINIO_IMAGE) + .withEnv("MINIO_ROOT_USER", ACCESS_KEY) + .withEnv("MINIO_ROOT_PASSWORD", SECRET_KEY) + .withCommand("server", "/data") + .withExposedPorts(MINIO_PORT) + .waitingFor( + Wait.forHttp("/minio/health/ready") + .forPort(MINIO_PORT) + .withStartupTimeout(Duration.ofMinutes(2))); + + private static String endpoint() { + return String.format("http://%s:%d", MINIO.getHost(), MINIO.getMappedPort(MINIO_PORT)); + } + + @BeforeAll + static void createBucket() { + try (S3Client client = + S3Client.builder() + .endpointOverride(URI.create(endpoint())) + .region(Region.US_EAST_1) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create(ACCESS_KEY, SECRET_KEY))) + .serviceConfiguration( + S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .build()) { + client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build()); + } + } + + @Test + void realS3OperationsEmitFlinkMetrics() throws Exception { + Configuration config = new Configuration(); + config.set(NativeS3FileSystemFactory.ENDPOINT, endpoint()); + config.set(NativeS3FileSystemFactory.ACCESS_KEY, ACCESS_KEY); + config.set(NativeS3FileSystemFactory.SECRET_KEY, SECRET_KEY); + config.set(NativeS3FileSystemFactory.REGION, "us-east-1"); + config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true); + config.set(NativeS3FileSystemFactory.CHUNKED_ENCODING_ENABLED, false); + config.set(NativeS3FileSystemFactory.CHECKSUM_VALIDATION_ENABLED, false); + config.set(NativeS3FileSystemFactory.METRICS_ENABLED, true); + + NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory(); + factory.configure(config); + + MetricListener metricListener = new MetricListener(); + // Mirror what FileSystem#attachMetrics hands to the factory: the "filesystem" child of the + // process-level group. + MetricGroup fsGroup = metricListener.getMetricGroup().addGroup("filesystem"); + factory.setMetricGroup(fsGroup); + + FileSystem fs = factory.create(URI.create("s3://" + BUCKET + "/")); + + // (1) A successful listing -> ListObjectsV2 (2xx). No request body, no checksum. + FileStatus[] listing = fs.listStatus(new Path("s3://" + BUCKET + "/")); + assertThat(listing).isNotNull(); + + // (2) A lookup of a key that does not exist -> HeadObject classified as an error (4xx). + try { + fs.getFileStatus(new Path("s3://" + BUCKET + "/does-not-exist-" + System.nanoTime())); + } catch (FileNotFoundException expected) { + // expected: the object is absent + } + + // The SDK publishes metrics after each completed call; the sync client typically publishes + // inline, but poll to remain robust against any asynchronous delivery. + CommonTestUtils.waitUtil( + () -> listObjectsSuccessCount(metricListener) > 0L, + Duration.ofSeconds(30), + "Expected a ListObjectsV2 api_call_count metric to be emitted by real S3 traffic"); + + // --- api_call_count (Counter) for the successful listing --- + long listCalls = listObjectsSuccessCount(metricListener); + assertThat(listCalls).as("ListObjectsV2 (2xx) api_call_count").isGreaterThan(0L); + + // --- api_call_duration_ms (Histogram) for the listing --- + Optional listDuration = + metricListener.getHistogram( + "filesystem", + "filesystem_type", + "s3", + "op", + "ListObjectsV2", + "api_call_duration_ms"); + assertThat(listDuration).as("ListObjectsV2 duration histogram").isPresent(); + assertThat(listDuration.get().getCount()).isGreaterThan(0L); + + // --- the failed lookup is recorded and classified as a client error (4xx) --- + long headErrorCalls = + counter( + metricListener, + "filesystem", + "filesystem_type", + "s3", + "op", + "HeadObject", + "status_class", + "4xx", + "api_call_count"); + assertThat(headErrorCalls) + .as("HeadObject (4xx) api_call_count for the missing key") + .isGreaterThan(0L); + } + + private static long listObjectsSuccessCount(MetricListener listener) { + return counter( + listener, + "filesystem", + "filesystem_type", + "s3", + "op", + "ListObjectsV2", + "status_class", + "2xx", + "api_call_count"); + } + + private static long counter(MetricListener listener, String... identifier) { + return listener.getCounter(identifier).map(Counter::getCount).orElse(0L); + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/S3MetricHistogramTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/S3MetricHistogramTest.java new file mode 100644 index 0000000000000..09782f2ebec62 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/metrics/S3MetricHistogramTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native.metrics; + +import org.apache.flink.metrics.HistogramStatistics; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link S3MetricHistogram}. */ +class S3MetricHistogramTest { + + @Test + void recordsCountMinMaxAndMean() { + S3MetricHistogram histogram = new S3MetricHistogram(1024); + for (long value = 1; value <= 100; value++) { + histogram.update(value); + } + + assertThat(histogram.getCount()).isEqualTo(100L); + + HistogramStatistics stats = histogram.getStatistics(); + assertThat(stats.size()).isEqualTo(100); + assertThat(stats.getMin()).isEqualTo(1L); + assertThat(stats.getMax()).isEqualTo(100L); + assertThat(stats.getMean()).isEqualTo(50.5); + assertThat(stats.getQuantile(0.5)).isBetween(49.0, 52.0); + } + + @Test + void slidingWindowEvictsOldestBeyondCapacity() { + S3MetricHistogram histogram = new S3MetricHistogram(4); + for (long value = 1; value <= 10; value++) { + histogram.update(value); + } + + assertThat(histogram.getCount()).isEqualTo(10L); // total ever recorded + + HistogramStatistics stats = histogram.getStatistics(); + assertThat(stats.size()).isEqualTo(4); // only the most recent four retained + assertThat(stats.getMin()).isEqualTo(7L); + assertThat(stats.getMax()).isEqualTo(10L); + } + + @Test + void emptyHistogramReturnsZeroes() { + S3MetricHistogram histogram = new S3MetricHistogram(8); + + HistogramStatistics stats = histogram.getStatistics(); + assertThat(stats.size()).isZero(); + assertThat(stats.getMin()).isZero(); + assertThat(stats.getMax()).isZero(); + assertThat(stats.getMean()).isZero(); + assertThat(stats.getQuantile(0.99)).isZero(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index de80e508eeac3..1aa2141a66a23 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -377,6 +377,33 @@ protected void initializeServices(Configuration configuration, PluginManager plu configuration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.set(JobManagerOptions.PORT, commonRpcService.getPort()); + metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem); + + final RpcService metricQueryServiceRpcService = + MetricUtils.startRemoteMetricsRpcService( + configuration, + commonRpcService.getAddress(), + configuration.get(JobManagerOptions.BIND_HOST), + rpcSystem); + metricRegistry.startQueryService(metricQueryServiceRpcService, null); + + final String hostname = RpcUtils.getHostname(commonRpcService); + + processMetricGroup = + MetricUtils.instantiateProcessMetricGroup( + metricRegistry, + hostname, + ConfigurationUtils.getSystemResourceMetricsProbingInterval( + configuration)); + + // Second-phase init for file system plugins that opt into metrics (e.g. + // flink-s3-fs-native): hand them the process-level metric group before any file system + // is used. This must run ahead of the HA services and BlobServer below, because those + // may open external file systems (e.g. S3 HA/blob storage), creating them first would + // cache metric-less file system clients for the rest of the process lifetime. See + // FileSystem#attachMetrics and MetricsAware. + FileSystem.attachMetrics(processMetricGroup); + ioExecutor = Executors.newFixedThreadPool( ClusterEntrypointUtils.getPoolSize(configuration), @@ -400,24 +427,6 @@ protected void initializeServices(Configuration configuration, PluginManager plu configuration.set(BlobServerOptions.PORT, String.valueOf(blobServer.getPort())); heartbeatServices = createHeartbeatServices(configuration); failureEnrichers = FailureEnricherUtils.getFailureEnrichers(configuration); - metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem); - - final RpcService metricQueryServiceRpcService = - MetricUtils.startRemoteMetricsRpcService( - configuration, - commonRpcService.getAddress(), - configuration.get(JobManagerOptions.BIND_HOST), - rpcSystem); - metricRegistry.startQueryService(metricQueryServiceRpcService, null); - - final String hostname = RpcUtils.getHostname(commonRpcService); - - processMetricGroup = - MetricUtils.instantiateProcessMetricGroup( - metricRegistry, - hostname, - ConfigurationUtils.getSystemResourceMetricsProbingInterval( - configuration)); archivedApplicationStore = createArchivedApplicationStore( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index c8f0e24f73218..1f872c2e809fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -640,6 +640,11 @@ public static TaskExecutor startTaskManager( resourceID, taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval()); + // Second-phase init for file system plugins that opt into metrics (e.g. + // flink-s3-fs-native): hand them the process-level metric group now that the + // MetricRegistry exists. See FileSystem#attachMetrics and MetricsAware. + FileSystem.attachMetrics(taskManagerMetricGroup.f0); + final ExecutorService ioExecutor = Executors.newFixedThreadPool( taskManagerServicesConfiguration.getNumIoThreads(),