Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,23 @@
*/
package org.apache.beam.runners.flink.metrics;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utility class for managing global metrics in a Flink environment. */
public class GlobalMetricsUtils {
private static final Logger LOG = LoggerFactory.getLogger(GlobalMetricsUtils.class);
private static final String GLOBAL_CONTAINER_STEP_NAME = "GLOBAL_METRICS";

// Maintain a reference to the FlinkMetricContainer for updating global metrics.
// This is required because Beam's global metrics need to be published to Flink's metrics system.
// The FlinkMetricContainer bridges Beam's metrics API with Flink's native metrics framework,
// allowing metrics from async operations to be properly reported to Flink's metric reporters.
private static final AtomicReference<FlinkMetricContainer> GLOBAL_FLINK_METRIC_CONTAINER =
new AtomicReference<>();

/**
* Sets the global metrics container if it is not already set.
*
Expand All @@ -32,6 +43,35 @@ public static synchronized void setGlobalMetrics(FlinkMetricContainer flinkMetri
if (MetricsEnvironment.getGlobalContainer().get() == null) {
MetricsEnvironment.setGlobalContainer(
flinkMetricContainer.getMetricsContainer(GLOBAL_CONTAINER_STEP_NAME));
// Store the FlinkMetricContainer reference for later use
GLOBAL_FLINK_METRIC_CONTAINER.set(flinkMetricContainer);
LOG.debug("Set global FlinkMetricContainer reference");
}
}

/**
* Updates and publishes global metrics to Flink's metrics system.
*
* <p>This method retrieves the stored FlinkMetricContainer reference and publishes accumulated
* metrics from async callback threads or other non-main-thread operations to Flink's metrics
* framework.
*
* <p>This method is synchronized to prevent race conditions when multiple threads attempt to
* update metrics concurrently, which could lead to incorrect metric values or duplicate metric
* registrations in Flink's metrics system.
*/
public static synchronized void updateGlobalMetrics() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can updateGlobalMetrics get called before setGlobalMetrics?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is possible, since we use dedicated thread to updateGlobalMetircs, and setGlobalMetrics is inited in a different thread. It is possible that the gloableMetrics is not inited and then we call updateGlobalMetircs

FlinkMetricContainer container = GLOBAL_FLINK_METRIC_CONTAINER.get();
if (container == null) {
LOG.warn("Cannot update global metrics: FlinkMetricContainer reference not set");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this scenario happen?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is possible, since we use dedicated thread to updateGlobalMetircs, and setGlobalMetrics is inited in a different thread. It is possible that the gloableMetrics is not inited and then we call updateGlobalMetircs

return;
}

try {
container.updateMetrics(GLOBAL_CONTAINER_STEP_NAME);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does container.updateMetrics gets called in the current implementation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be called at beam-runner library. Will share you PR.

LOG.debug("Successfully updated global metrics");
} catch (Exception e) {
LOG.warn("Failed to update global metrics", e);
}
}
}