Skip to content

[FLINK-39546][s3] Improve observability in flink-s3-fs-native by exposing operation-level S3 metrics#28427

Open
Samrat002 wants to merge 1 commit into
apache:masterfrom
Samrat002:metrics-system
Open

[FLINK-39546][s3] Improve observability in flink-s3-fs-native by exposing operation-level S3 metrics#28427
Samrat002 wants to merge 1 commit into
apache:masterfrom
Samrat002:metrics-system

Conversation

@Samrat002

@Samrat002 Samrat002 commented Jun 13, 2026

Copy link
Copy Markdown
Contributor

What is the purpose of the change

flink-s3-fs-native currently emits no metrics. When a job's checkpoints, savepoints, or sinks go through it, operators have no visibility into how Flink is actually talking to S3: request volume, latency, throttling, or retries, which makes diagnosing slow or failing checkpoints largely guesswork.

This change makes the native S3 filesystem report operation-level S3 metrics into Flink's metric system. It does so by bridging the AWS SDK's built-in metrics SPI into Flink Counter/Histogram instruments, so every completed S3 API call is counted, timed, and classified.

More details on : FLIP-576

Brief change log

flink-core

  • Add MetricsAware (@PublicEvolving, org.apache.flink.core.plugin): a FileSystemFactory implements it to be handed a MetricGroup.
  • Add FileSystem.attachMetrics(MetricGroup) (@Internal): creates a filesystem child group and forwards it to every registered MetricsAware factory; resilient to a misbehaving factory and idempotent.
  • PluginFileSystemFactory now implements MetricsAware and forwards setMetricGroup to the wrapped inner factory under the plugin classloader. Without this, plugin-loaded filesystems (the normal deployment mode) would silently never receive the group.

flink-runtime

  • ClusterEntrypoint (JobManager) and TaskManagerRunner (TaskManager) call FileSystem.attachMetrics(processMetricGroup) during startup. The ClusterEntrypoint service-init order was adjusted so this runs before HA/blob services cache filesystem clients, otherwise those early clients would be created without a metric group.

flink-s3-fs-native

  • NativeS3FileSystemFactory / NativeS3AFileSystemFactory implement MetricsAware and tag metrics with a filesystem_type label set to the scheme (s3 vs s3a), so the two stay distinguishable.
  • AwsSdkMetricBridge implements software.amazon.awssdk.metrics.MetricPublisher and translates each MetricCollection into Flink metrics: api_call_count (labels: op, status_class), api_call_duration_ms (histogram, label op), throttle_count (label op), retry_count (labels: op, reason).
  • S3MetricHistogram: a bounded sliding-window histogram backing the duration metric.
  • S3ClientProvider registers the publisher on the sync/async clients.
  • New config options: s3.metrics.enabled (off by default), s3.metrics.allowlist, s3.metrics.histogram.window-size.

Verifying this change

This change added tests and can be verified as follows.

Automated tests

  • AwsSdkMetricBridgeTest — translation of SDK records to Flink metrics; status_class classification (2xx/4xx/5xx/throttled); retry attribution; allowlist behavior (explicit list, * wildcard, empty → defaults).
  • S3MetricHistogramTest — sliding-window statistics.
  • NativeS3FileSystemFactoryMetricsTest — the filesystem_type label resolves to s3 / s3a per factory.
  • FileSystemAttachMetricsTest (flink-core) — attachMetrics unwraps PluginFileSystemFactory to reach the real factory, skips non-MetricsAware factories, survives a throwing factory, and is idempotent.
  • NativeS3MetricsEmissionITCase — MinIO via Testcontainers; real GET/HEAD/LIST round trips, asserting the counters/histograms are readable back through a real MetricRegistry (MetricListener). Auto-skips without Docker.

Manual end-to-end against real AWS S3

I ran a standalone cluster built from this branch with s3.metrics.enabled: true and the SLF4J reporter, and submitted a large-state streaming job checkpointing to s3://<bucket>/checkpoints (HashMap backend, filesystem checkpoint storage, 10 s interval). The native plugin loaded (Plugin loader ... s3-fs-native), built its client via the SDK default credential chain, and wrote real checkpoint objects to S3. The reporter then showed the metrics on both the TaskManager (data-plane writes) and the JobManager (checkpoint coordination / multipart):

# TaskManager
...filesystem.filesystem_type.s3.op.PutObject.status_class.2xx.api_call_count: 31
...filesystem.filesystem_type.s3.op.PutObject.api_call_duration_ms: count=31, min=343, max=8329, mean=3383.2, p99=8329.0
...filesystem.filesystem_type.s3.op.ListObjectsV2.status_class.2xx.api_call_count: 31
...filesystem.filesystem_type.s3.op.ListObjectsV2.reason.other.retry_count: 1
...filesystem.filesystem_type.s3.op.HeadObject.status_class.4xx.api_call_count: 25

# JobManager
...filesystem.filesystem_type.s3.op.CreateMultipartUpload.status_class.2xx.api_call_count: 25
...filesystem.filesystem_type.s3.op.UploadPart.status_class.2xx.api_call_count: 25
...filesystem.filesystem_type.s3.op.CompleteMultipartUpload.status_class.2xx.api_call_count: 25
...filesystem.filesystem_type.s3.op.DeleteObject.status_class.2xx.api_call_count: 48
...filesystem.filesystem_type.s3.op.HeadObject.status_class.2xx.api_call_count: 48

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: yes, native-s3-fs

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented?
    this change introduces the feature, followup documentation is up next.

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

@flinkbot

flinkbot commented Jun 13, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants