Skip to content

Conversation

@wenbinhuang9
Copy link

@wenbinhuang9 wenbinhuang9 commented Oct 20, 2025

Summary:

These changes enable the periodic publishing of global metrics from non-main threads (async callbacks, background tasks) to Flink's metrics system. The updateGlobalMetrics() method is the key addition that allows a background timer to push accumulated metrics to Flink at regular intervals.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

Copy link

@minxhe minxhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple questions

public static synchronized void updateGlobalMetrics() {
FlinkMetricContainer container = GLOBAL_FLINK_METRIC_CONTAINER.get();
if (container == null) {
LOG.warn("Cannot update global metrics: FlinkMetricContainer reference not set");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this scenario happen?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is possible, since we use dedicated thread to updateGlobalMetircs, and setGlobalMetrics is inited in a different thread. It is possible that the gloableMetrics is not inited and then we call updateGlobalMetircs

* update metrics concurrently, which could lead to incorrect metric values or duplicate metric
* registrations in Flink's metrics system.
*/
public static synchronized void updateGlobalMetrics() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can updateGlobalMetrics get called before setGlobalMetrics?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is possible, since we use dedicated thread to updateGlobalMetircs, and setGlobalMetrics is inited in a different thread. It is possible that the gloableMetrics is not inited and then we call updateGlobalMetircs

}

try {
container.updateMetrics(GLOBAL_CONTAINER_STEP_NAME);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does container.updateMetrics gets called in the current implementation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be called at beam-runner library. Will share you PR.

private static final Logger LOG = LoggerFactory.getLogger(GlobalMetricsUtils.class);
private static final String GLOBAL_CONTAINER_STEP_NAME = "GLOBAL_METRICS";

// Maintain a reference to the FlinkMetricContainer for updating global metrics
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add more context on why this is needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants