From 0dcc8fde4390e1c1b04c93ee277fb749cca7cf50 Mon Sep 17 00:00:00 2001 From: Elia LIU Date: Fri, 27 Mar 2026 00:34:41 +1100 Subject: [PATCH 1/2] [python] Expand SDF in PortableRunner default optimization Enable translations.expand_sdf in PortableRunner's default pre-optimization path so Python Read transforms are expanded for portable runners like Spark. Also add optimizer coverage for default SDF expansion, explicit pre_optimize=expand_sdf, and bounded Read expansion.\n\nRefs #24422. --- .../runners/portability/portable_runner.py | 7 +- .../portability/portable_runner_test.py | 154 ++++++++++++++++++ 2 files changed, 160 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 94a467d5a249..2152a39d858b 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -316,6 +316,10 @@ def _optimize_pipeline( # Eventually remove the 'lift_combiners' phase from 'default'. translations.pack_combiners, translations.lift_combiners, + # Expand SDF so that portable runners that don't support SDFs + # natively (e.g. Spark) can still parallelize Read transforms. + # See https://github.com/apache/beam/issues/24422 + translations.expand_sdf, translations.sort_stages ] partial = True @@ -332,7 +336,8 @@ def _optimize_pipeline( phases = [] for phase_name in pre_optimize.split(','): # For now, these are all we allow. - if phase_name in ('pack_combiners', 'lift_combiners'): + if phase_name in ( + 'pack_combiners', 'lift_combiners', 'expand_sdf'): phases.append(getattr(translations, phase_name)) else: raise ValueError( diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index 31293a4d43ec..b54063ecfc81 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -460,6 +460,160 @@ def create_options(self): return options +class PortableRunnerOptimizationTest(unittest.TestCase): + """Tests for PortableRunner._optimize_pipeline.""" + + def test_default_optimize_expands_sdf(self): + """Verify that expand_sdf is applied in the default pre_optimize setting. + + See https://github.com/apache/beam/issues/24422. + """ + from apache_beam.portability import common_urns + from apache_beam.io import restriction_trackers + + class ExpandStringsProvider(beam.transforms.core.RestrictionProvider): + def initial_restriction(self, element): + return restriction_trackers.OffsetRange(0, len(element)) + + def create_tracker(self, restriction): + return restriction_trackers.OffsetRestrictionTracker(restriction) + + def restriction_size(self, element, restriction): + return restriction.size() + + class ExpandingStringsDoFn(beam.DoFn): + def process( + self, + element, + restriction_tracker=beam.DoFn.RestrictionParam( + ExpandStringsProvider())): + cur = restriction_tracker.current_restriction().start + while restriction_tracker.try_claim(cur): + yield element[cur] + cur += 1 + + p = beam.Pipeline() + _ = (p | beam.Create(['abc']) | beam.ParDo(ExpandingStringsDoFn())) + proto = p.to_runner_api() + + # Default options (no pre_optimize experiment set). + options = PipelineOptions() + optimized = PortableRunner._optimize_pipeline(proto, options) + + transform_urns = set() + for t in optimized.components.transforms.values(): + if t.spec.urn: + transform_urns.add(t.spec.urn) + + self.assertIn( + common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns) + self.assertIn( + common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn, + transform_urns) + self.assertIn( + common_urns.sdf_components + .PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, + transform_urns) + + def test_custom_optimize_expand_sdf(self): + """Verify that expand_sdf can be requested explicitly.""" + from apache_beam.portability import common_urns + from apache_beam.io import restriction_trackers + + class ExpandStringsProvider(beam.transforms.core.RestrictionProvider): + def initial_restriction(self, element): + return restriction_trackers.OffsetRange(0, len(element)) + + def create_tracker(self, restriction): + return restriction_trackers.OffsetRestrictionTracker(restriction) + + def restriction_size(self, element, restriction): + return restriction.size() + + class ExpandingStringsDoFn(beam.DoFn): + def process( + self, + element, + restriction_tracker=beam.DoFn.RestrictionParam( + ExpandStringsProvider())): + cur = restriction_tracker.current_restriction().start + while restriction_tracker.try_claim(cur): + yield element[cur] + cur += 1 + + p = beam.Pipeline() + _ = (p | beam.Create(['abc']) | beam.ParDo(ExpandingStringsDoFn())) + proto = p.to_runner_api() + + options = PipelineOptions(['--experiments=pre_optimize=expand_sdf']) + optimized = PortableRunner._optimize_pipeline(proto, options) + + transform_urns = set() + for t in optimized.components.transforms.values(): + if t.spec.urn: + transform_urns.add(t.spec.urn) + + self.assertIn( + common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns) + self.assertIn( + common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn, + transform_urns) + self.assertIn( + common_urns.sdf_components + .PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, + transform_urns) + + def test_default_optimize_expands_bounded_read(self): + """Verify that iobase.Read(BoundedSource) is expanded by default. + + This is the end-to-end scenario from + https://github.com/apache/beam/issues/24422: Read transforms like + ReadFromParquet use SDFs internally. Without expand_sdf in the default + optimization, these arrive at the Spark job server as a single ParDo, + executing on one partition with no parallelization. + """ + from apache_beam.portability import common_urns + from apache_beam.io import iobase + + class _FakeBoundedSource(iobase.BoundedSource): + def get_range_tracker(self, start_position, stop_position): + return None + + def read(self, range_tracker): + return iter([]) + + def estimate_size(self): + return 0 + + p = beam.Pipeline() + _ = p | beam.io.Read(_FakeBoundedSource()) + proto = p.to_runner_api() + + # Default options (no pre_optimize experiment set). + options = PipelineOptions() + optimized = PortableRunner._optimize_pipeline(proto, options) + + transform_urns = set() + for t in optimized.components.transforms.values(): + if t.spec.urn: + transform_urns.add(t.spec.urn) + + # The SDFBoundedSourceReader DoFn should have been expanded into + # SDF component stages. + self.assertIn( + common_urns.sdf_components.PAIR_WITH_RESTRICTION.urn, transform_urns) + self.assertIn( + common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn, + transform_urns) + self.assertIn( + common_urns.sdf_components + .PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, + transform_urns) + # Reshuffle should be present to enable parallelization. + self.assertIn( + common_urns.composites.RESHUFFLE.urn, transform_urns) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() From 2641d1f9bded59da7cca4778f1b068eb16c32923 Mon Sep 17 00:00:00 2001 From: Elia LIU Date: Fri, 27 Mar 2026 01:05:27 +1100 Subject: [PATCH 2/2] [python] Fix formatting for PortableRunner SDF optimization --- .../runners/portability/portable_runner.py | 3 +-- .../portability/portable_runner_test.py | 19 +++++++------------ 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 2152a39d858b..c386d90da19a 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -336,8 +336,7 @@ def _optimize_pipeline( phases = [] for phase_name in pre_optimize.split(','): # For now, these are all we allow. - if phase_name in ( - 'pack_combiners', 'lift_combiners', 'expand_sdf'): + if phase_name in ('pack_combiners', 'lift_combiners', 'expand_sdf'): phases.append(getattr(translations, phase_name)) else: raise ValueError( diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py index b54063ecfc81..b1197e8141ee 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py @@ -462,14 +462,13 @@ def create_options(self): class PortableRunnerOptimizationTest(unittest.TestCase): """Tests for PortableRunner._optimize_pipeline.""" - def test_default_optimize_expands_sdf(self): """Verify that expand_sdf is applied in the default pre_optimize setting. See https://github.com/apache/beam/issues/24422. """ - from apache_beam.portability import common_urns from apache_beam.io import restriction_trackers + from apache_beam.portability import common_urns class ExpandStringsProvider(beam.transforms.core.RestrictionProvider): def initial_restriction(self, element): @@ -511,14 +510,13 @@ def process( common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn, transform_urns) self.assertIn( - common_urns.sdf_components - .PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, + common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, transform_urns) def test_custom_optimize_expand_sdf(self): """Verify that expand_sdf can be requested explicitly.""" - from apache_beam.portability import common_urns from apache_beam.io import restriction_trackers + from apache_beam.portability import common_urns class ExpandStringsProvider(beam.transforms.core.RestrictionProvider): def initial_restriction(self, element): @@ -559,8 +557,7 @@ def process( common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn, transform_urns) self.assertIn( - common_urns.sdf_components - .PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, + common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, transform_urns) def test_default_optimize_expands_bounded_read(self): @@ -572,8 +569,8 @@ def test_default_optimize_expands_bounded_read(self): optimization, these arrive at the Spark job server as a single ParDo, executing on one partition with no parallelization. """ - from apache_beam.portability import common_urns from apache_beam.io import iobase + from apache_beam.portability import common_urns class _FakeBoundedSource(iobase.BoundedSource): def get_range_tracker(self, start_position, stop_position): @@ -606,12 +603,10 @@ def estimate_size(self): common_urns.sdf_components.SPLIT_AND_SIZE_RESTRICTIONS.urn, transform_urns) self.assertIn( - common_urns.sdf_components - .PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, + common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS.urn, transform_urns) # Reshuffle should be present to enable parallelization. - self.assertIn( - common_urns.composites.RESHUFFLE.urn, transform_urns) + self.assertIn(common_urns.composites.RESHUFFLE.urn, transform_urns) if __name__ == '__main__':