diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java index 11bae4d894a2..8681174ddf2d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/GlobalMetricsUtils.java @@ -17,12 +17,23 @@ */ package org.apache.beam.runners.flink.metrics; +import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Utility class for managing global metrics in a Flink environment. */ public class GlobalMetricsUtils { + private static final Logger LOG = LoggerFactory.getLogger(GlobalMetricsUtils.class); private static final String GLOBAL_CONTAINER_STEP_NAME = "GLOBAL_METRICS"; + // Maintain a reference to the FlinkMetricContainer for updating global metrics. + // This is required because Beam's global metrics need to be published to Flink's metrics system. + // The FlinkMetricContainer bridges Beam's metrics API with Flink's native metrics framework, + // allowing metrics from async operations to be properly reported to Flink's metric reporters. + private static final AtomicReference GLOBAL_FLINK_METRIC_CONTAINER = + new AtomicReference<>(); + /** * Sets the global metrics container if it is not already set. * @@ -32,6 +43,35 @@ public static synchronized void setGlobalMetrics(FlinkMetricContainer flinkMetri if (MetricsEnvironment.getGlobalContainer().get() == null) { MetricsEnvironment.setGlobalContainer( flinkMetricContainer.getMetricsContainer(GLOBAL_CONTAINER_STEP_NAME)); + // Store the FlinkMetricContainer reference for later use + GLOBAL_FLINK_METRIC_CONTAINER.set(flinkMetricContainer); + LOG.debug("Set global FlinkMetricContainer reference"); + } + } + + /** + * Updates and publishes global metrics to Flink's metrics system. + * + *

This method retrieves the stored FlinkMetricContainer reference and publishes accumulated + * metrics from async callback threads or other non-main-thread operations to Flink's metrics + * framework. + * + *

This method is synchronized to prevent race conditions when multiple threads attempt to + * update metrics concurrently, which could lead to incorrect metric values or duplicate metric + * registrations in Flink's metrics system. + */ + public static synchronized void updateGlobalMetrics() { + FlinkMetricContainer container = GLOBAL_FLINK_METRIC_CONTAINER.get(); + if (container == null) { + LOG.warn("Cannot update global metrics: FlinkMetricContainer reference not set"); + return; + } + + try { + container.updateMetrics(GLOBAL_CONTAINER_STEP_NAME); + LOG.debug("Successfully updated global metrics"); + } catch (Exception e) { + LOG.warn("Failed to update global metrics", e); } } }