From ffad8579a1d489a92137f30b1cd50cca785686a5 Mon Sep 17 00:00:00 2001 From: Wenbin Huang Date: Thu, 16 Oct 2025 15:52:14 -0700 Subject: [PATCH 1/4] Add updateGlobalMetric methods --- .../flink/metrics/GlobalMetricsUtils.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java index 11bae4d894a2..9fd1ab990fcc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java @@ -17,12 +17,20 @@ */ package org.apache.beam.runners.flink.metrics; +import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Utility class for managing global metrics in a Flink environment. */ public class GlobalMetricsUtils { + private static final Logger LOG = LoggerFactory.getLogger(GlobalMetricsUtils.class); private static final String GLOBAL_CONTAINER_STEP_NAME = "GLOBAL_METRICS"; + // Maintain a reference to the FlinkMetricContainer for updating global metrics + private static final AtomicReference GLOBAL_FLINK_METRIC_CONTAINER = + new AtomicReference<>(null); + /** * Sets the global metrics container if it is not already set. * @@ -32,6 +40,35 @@ public static synchronized void setGlobalMetrics(FlinkMetricContainer flinkMetri if (MetricsEnvironment.getGlobalContainer().get() == null) { MetricsEnvironment.setGlobalContainer( flinkMetricContainer.getMetricsContainer(GLOBAL_CONTAINER_STEP_NAME)); + // Store the FlinkMetricContainer reference for later use + GLOBAL_FLINK_METRIC_CONTAINER.set(flinkMetricContainer); + LOG.debug("Set global FlinkMetricContainer reference"); + } + } + + /** + * Updates and publishes global metrics to Flink's metrics system. + * + *

This method retrieves the stored FlinkMetricContainer reference and publishes accumulated + * metrics from async callback threads or other non-main-thread operations to Flink's metrics + * framework. + * + *

This method is synchronized to prevent race conditions when multiple threads attempt to + * update metrics concurrently, which could lead to incorrect metric values or duplicate metric + * registrations in Flink's metrics system. + */ + public static synchronized void updateGlobalMetrics() { + FlinkMetricContainer container = GLOBAL_FLINK_METRIC_CONTAINER.get(); + if (container == null) { + LOG.warn("Cannot update global metrics: FlinkMetricContainer reference not set"); + return; + } + + try { + container.updateMetrics(GLOBAL_CONTAINER_STEP_NAME); + LOG.debug("Successfully updated global metrics"); + } catch (Exception e) { + LOG.warn("Failed to update global metrics", e); } } } From 1a9fb7fa807d511235a21854016b46bc86fbfa51 Mon Sep 17 00:00:00 2001 From: Wenbin Huang Date: Tue, 28 Oct 2025 13:45:11 -0700 Subject: [PATCH 2/4] fix a warning at GlobalMetricsUtils --- .../apache/beam/runners/flink/metrics/GlobalMetricsUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java index 9fd1ab990fcc..04896ccc7ad1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java @@ -29,9 +29,9 @@ public class GlobalMetricsUtils { // Maintain a reference to the FlinkMetricContainer for updating global metrics private static final AtomicReference GLOBAL_FLINK_METRIC_CONTAINER = - new AtomicReference<>(null); + new AtomicReference<>(); - /** +/** * Sets the global metrics container if it is not already set. * * @param flinkMetricContainer The Flink metric container to set as the global container. From 768924890dd3545b319526d87c478904e3de1f3c Mon Sep 17 00:00:00 2001 From: Wenbin Huang Date: Tue, 28 Oct 2025 14:13:18 -0700 Subject: [PATCH 3/4] fix format issue --- .../apache/beam/runners/flink/metrics/GlobalMetricsUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java index 04896ccc7ad1..e4928c4dce29 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java @@ -31,7 +31,7 @@ public class GlobalMetricsUtils { private static final AtomicReference GLOBAL_FLINK_METRIC_CONTAINER = new AtomicReference<>(); -/** + /** * Sets the global metrics container if it is not already set. * * @param flinkMetricContainer The Flink metric container to set as the global container. From 883bf319d9383880783c36b481e16434f9c7409d Mon Sep 17 00:00:00 2001 From: Wenbin Huang Date: Thu, 13 Nov 2025 13:55:31 -0800 Subject: [PATCH 4/4] add additional comments why using flink container --- .../beam/runners/flink/metrics/GlobalMetricsUtils.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java index e4928c4dce29..8681174ddf2d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java @@ -27,7 +27,10 @@ public class GlobalMetricsUtils { private static final Logger LOG = LoggerFactory.getLogger(GlobalMetricsUtils.class); private static final String GLOBAL_CONTAINER_STEP_NAME = "GLOBAL_METRICS"; - // Maintain a reference to the FlinkMetricContainer for updating global metrics + // Maintain a reference to the FlinkMetricContainer for updating global metrics. + // This is required because Beam's global metrics need to be published to Flink's metrics system. + // The FlinkMetricContainer bridges Beam's metrics API with Flink's native metrics framework, + // allowing metrics from async operations to be properly reported to Flink's metric reporters. private static final AtomicReference GLOBAL_FLINK_METRIC_CONTAINER = new AtomicReference<>();