From fb633047f88a7c847489fcce78d8e758be933a29 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sat, 3 Jan 2026 14:19:38 -0500 Subject: [PATCH] fix: Enable `create_test` to correctly parse and apply external providers defined in YAML pipeline specifications. --- sdks/python/apache_beam/yaml/yaml_testing.py | 21 +++++++--- .../apache_beam/yaml/yaml_testing_test.py | 39 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_testing.py b/sdks/python/apache_beam/yaml/yaml_testing.py index ad31afa927e0..ead3ab9de319 100644 --- a/sdks/python/apache_beam/yaml/yaml_testing.py +++ b/sdks/python/apache_beam/yaml/yaml_testing.py @@ -411,6 +411,13 @@ def create_test( **yaml_transform.SafeLineLoader.strip_metadata( pipeline_spec.get('options', {}))) + providers = yaml_provider.merge_providers( + yaml_provider.parse_providers('', pipeline_spec.get('providers', [])), + { + 'AssertEqualAndRecord': yaml_provider.as_provider_list( + 'AssertEqualAndRecord', AssertEqualAndRecord) + }) + def get_name(transform): if 'name' in transform: return str(transform['name']) @@ -428,7 +435,8 @@ def get_name(transform): mock_outputs = [{ 'name': get_name(t), 'elements': [ - _try_row_as_dict(row) for row in _first_n(t, options, max_num_inputs) + _try_row_as_dict(row) + for row in _first_n(t, options, max_num_inputs, providers) ], } for t in input_transforms] @@ -504,15 +512,18 @@ def record(element): return pcoll | beam.Map(record) -def _first_n(transform_spec, options, n): +def _first_n(transform_spec, options, n, providers=None): recorder = RecordElements(n) + if providers is None: + providers = { + 'AssertEqualAndRecord': yaml_provider.as_provider_list( + 'AssertEqualAndRecord', AssertEqualAndRecord) + } try: with beam.Pipeline(options=options) as p: _ = ( p - | yaml_transform.YamlTransform( - transform_spec, - providers={'AssertEqualAndRecord': AssertEqualAndRecord}) + | yaml_transform.YamlTransform(transform_spec, providers=providers) | recorder) except _DoneException: pass diff --git a/sdks/python/apache_beam/yaml/yaml_testing_test.py b/sdks/python/apache_beam/yaml/yaml_testing_test.py index 9bb0e64b6db5..70e9246e4d3e 100644 --- a/sdks/python/apache_beam/yaml/yaml_testing_test.py +++ b/sdks/python/apache_beam/yaml/yaml_testing_test.py @@ -356,6 +356,45 @@ def test_toplevel_providers(self): }] }) + def test_create_with_external_providers(self): + """Test that create_test works with external providers defined in the + pipeline spec. + + This test validates the fix for issue #37136 where external providers + defined in YAML files were not recognized when running tests. + """ + pipeline = ''' + pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - {a: 1, b: 2} + - {a: 2, b: 3} + - {a: 3, b: 4} + - {a: 4, b: 5} + - {a: 5, b: 6} + - type: MyCustomTransform + - type: LogForTesting + providers: + - type: yaml + transforms: + MyCustomTransform: + body: + type: MapToFields + config: + language: python + fields: + sum_ab: a + b + ''' + test_spec = yaml_testing.create_test( + pipeline, max_num_inputs=10, min_num_outputs=3) + + self.assertEqual(len(test_spec['expected_inputs']), 1) + self.assertGreaterEqual(len(test_spec['expected_inputs'][0]['elements']), 3) + yaml_testing.run_test(pipeline, test_spec) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)