From ac5742772a725a1436ba99adc1addd68bc2fdb68 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 26 Mar 2026 18:39:00 +0000 Subject: [PATCH 1/4] refactor: refine benchmark class layout in bench_eval_type.py --- python/benchmarks/bench_eval_type.py | 310 +++++++++++---------------- 1 file changed, 126 insertions(+), 184 deletions(-) diff --git a/python/benchmarks/bench_eval_type.py b/python/benchmarks/bench_eval_type.py index a489d5d15ce5..db68d00bfaf0 100644 --- a/python/benchmarks/bench_eval_type.py +++ b/python/benchmarks/bench_eval_type.py @@ -350,29 +350,6 @@ def _build_non_grouped_scenarios(): return scenarios -_NON_GROUPED_SCENARIOS = _build_non_grouped_scenarios() - - -class _NonGroupedBenchMixin: - """Provides ``_write_scenario`` for non-grouped Arrow eval types. - - Subclasses set ``_eval_type``, ``_scenarios``, and ``_udfs``. - UDF entries with ``ret_type=None`` inherit ``col0_type`` from the scenario. - """ - - def _write_scenario(self, scenario, udf_name, buf): - batch, num_batches, col0_type = self._scenarios[scenario] - udf_func, ret_type, arg_offsets = self._udfs[udf_name] - if ret_type is None: - ret_type = col0_type - _write_worker_input( - self._eval_type, - lambda b: _build_udf_payload(udf_func, ret_type, arg_offsets, b), - lambda b: _write_arrow_ipc_batches((batch for _ in range(num_batches)), b), - buf, - ) - - # -- SQL_ARROW_BATCHED_UDF -------------------------------------------------- # Arrow-optimized Python UDF: receives individual Python values per row, # returns a single Python value. The wire protocol includes an extra @@ -430,25 +407,24 @@ def _build_arrow_batched_scenarios(): return scenarios -_ARROW_BATCHED_SCENARIOS = _build_arrow_batched_scenarios() - - -# UDFs for SQL_ARROW_BATCHED_UDF operate on individual Python values. -# arg_offsets=[0] means the UDF receives column 0 value per row. -_ARROW_BATCHED_UDFS = { - "identity_udf": (lambda x: x, None, [0]), - "stringify_udf": (lambda x: str(x), StringType(), [0]), - "nullcheck_udf": (lambda x: x is not None, BooleanType(), [0]), -} - - class _ArrowBatchedBenchMixin: """Provides ``_write_scenario`` for SQL_ARROW_BATCHED_UDF. - Like ``_NonGroupedBenchMixin`` but writes the extra ``input_type`` - (StructType JSON) that the wire protocol requires. + Writes the extra ``input_type`` (StructType JSON) that the wire protocol + requires before the UDF payload. """ + _scenarios = _build_arrow_batched_scenarios() + # UDFs for SQL_ARROW_BATCHED_UDF operate on individual Python values. + # arg_offsets=[0] means the UDF receives column 0 value per row. + _udfs = { + "identity_udf": (lambda x: x, None, [0]), + "stringify_udf": (lambda x: str(x), StringType(), [0]), + "nullcheck_udf": (lambda x: x is not None, BooleanType(), [0]), + } + params = [list(_scenarios), list(_udfs)] + param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): batch, num_batches, input_struct, col0_type = self._scenarios[scenario] udf_func, ret_type, arg_offsets = self._udfs[udf_name] @@ -469,17 +445,11 @@ def write_command(b): class ArrowBatchedUDFTimeBench(_ArrowBatchedBenchMixin, _TimeBenchBase): - _scenarios = _ARROW_BATCHED_SCENARIOS - _udfs = _ARROW_BATCHED_UDFS - params = [list(_ARROW_BATCHED_SCENARIOS), list(_ARROW_BATCHED_UDFS)] - param_names = ["scenario", "udf"] + pass class ArrowBatchedUDFPeakmemBench(_ArrowBatchedBenchMixin, _PeakmemBenchBase): - _scenarios = _ARROW_BATCHED_SCENARIOS - _udfs = _ARROW_BATCHED_UDFS - params = [list(_ARROW_BATCHED_SCENARIOS), list(_ARROW_BATCHED_UDFS)] - param_names = ["scenario", "udf"] + pass # --------------------------------------------------------------------------- @@ -577,12 +547,6 @@ def _grouped_agg_arrow_mean_multi(col0, col1): return (pc.mean(col0).as_py() or 0) + (pc.mean(col1).as_py() or 0) -_GROUPED_AGG_ARROW_UDFS = { - "sum_udf": _grouped_agg_arrow_sum, - "mean_multi_udf": _grouped_agg_arrow_mean_multi, -} - - # -- SQL_GROUPED_AGG_ARROW_ITER_UDF ------------------------------------------ # UDF receives Iterator[pa.Array] (or Iterator[Tuple[pa.Array, ...]]) per group, # returns scalar. @@ -608,12 +572,6 @@ def _grouped_agg_arrow_iter_mean_multi(batch_iter): return total -_GROUPED_AGG_ARROW_ITER_UDFS = { - "sum_udf": _grouped_agg_arrow_iter_sum, - "mean_multi_udf": _grouped_agg_arrow_iter_mean_multi, -} - - def _make_agg_arrow_groups( num_groups: int, rows_per_group: int, @@ -658,12 +616,17 @@ def _build_grouped_agg_arrow_scenarios(): return scenarios -_GROUPED_AGG_ARROW_SCENARIOS = _build_grouped_agg_arrow_scenarios() - - class _GroupedAggArrowBenchMixin: """Provides _write_scenario for SQL_GROUPED_AGG_ARROW_UDF.""" + _scenarios = _build_grouped_agg_arrow_scenarios() + _udfs = { + "sum_udf": _grouped_agg_arrow_sum, + "mean_multi_udf": _grouped_agg_arrow_mean_multi, + } + params = [list(_scenarios), list(_udfs)] + param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): groups, n_cols = self._scenarios[scenario] udf_func = self._udfs[udf_name] @@ -688,22 +651,23 @@ def write_command(b): class GroupedAggArrowUDFTimeBench(_GroupedAggArrowBenchMixin, _TimeBenchBase): - _scenarios = _GROUPED_AGG_ARROW_SCENARIOS - _udfs = _GROUPED_AGG_ARROW_UDFS - params = [list(_GROUPED_AGG_ARROW_SCENARIOS), list(_GROUPED_AGG_ARROW_UDFS)] - param_names = ["scenario", "udf"] + pass class GroupedAggArrowUDFPeakmemBench(_GroupedAggArrowBenchMixin, _PeakmemBenchBase): - _scenarios = _GROUPED_AGG_ARROW_SCENARIOS - _udfs = _GROUPED_AGG_ARROW_UDFS - params = [list(_GROUPED_AGG_ARROW_SCENARIOS), list(_GROUPED_AGG_ARROW_UDFS)] - param_names = ["scenario", "udf"] + pass -class _GroupedAggArrowIterBenchMixin: +class _GroupedAggArrowIterBenchMixin(_GroupedAggArrowBenchMixin): """Provides _write_scenario for SQL_GROUPED_AGG_ARROW_ITER_UDF.""" + _udfs = { + "sum_udf": _grouped_agg_arrow_iter_sum, + "mean_multi_udf": _grouped_agg_arrow_iter_mean_multi, + } + params = [_GroupedAggArrowBenchMixin.params[0], list(_udfs)] + param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): groups, n_cols = self._scenarios[scenario] udf_func = self._udfs[udf_name] @@ -728,17 +692,11 @@ def write_command(b): class GroupedAggArrowIterUDFTimeBench(_GroupedAggArrowIterBenchMixin, _TimeBenchBase): - _scenarios = _GROUPED_AGG_ARROW_SCENARIOS - _udfs = _GROUPED_AGG_ARROW_ITER_UDFS - params = [list(_GROUPED_AGG_ARROW_SCENARIOS), list(_GROUPED_AGG_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] + pass class GroupedAggArrowIterUDFPeakmemBench(_GroupedAggArrowIterBenchMixin, _PeakmemBenchBase): - _scenarios = _GROUPED_AGG_ARROW_SCENARIOS - _udfs = _GROUPED_AGG_ARROW_ITER_UDFS - params = [list(_GROUPED_AGG_ARROW_SCENARIOS), list(_GROUPED_AGG_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] + pass # -- SQL_GROUPED_MAP_ARROW_UDF ------------------------------------------------ @@ -763,13 +721,6 @@ def _grouped_map_arrow_filter(table): return table.filter(pc.is_valid(table.column(0))) -_GROUPED_MAP_ARROW_UDFS = { - "identity_udf": _grouped_map_arrow_identity, - "sort_udf": _grouped_map_arrow_sort, - "filter_udf": _grouped_map_arrow_filter, -} - - def _build_grouped_map_arrow_scenarios(): """Build scenarios for SQL_GROUPED_MAP_ARROW_UDF. @@ -794,12 +745,18 @@ def _build_grouped_map_arrow_scenarios(): return scenarios -_GROUPED_MAP_ARROW_SCENARIOS = _build_grouped_map_arrow_scenarios() - - class _GroupedMapArrowBenchMixin: """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_UDF.""" + _scenarios = _build_grouped_map_arrow_scenarios() + _udfs = { + "identity_udf": _grouped_map_arrow_identity, + "sort_udf": _grouped_map_arrow_sort, + "filter_udf": _grouped_map_arrow_filter, + } + params = [list(_scenarios), list(_udfs)] + param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): groups, return_type, arg_offsets = self._scenarios[scenario] udf_func = self._udfs[udf_name] @@ -826,17 +783,11 @@ def write_command(b): class GroupedMapArrowUDFTimeBench(_GroupedMapArrowBenchMixin, _TimeBenchBase): - _scenarios = _GROUPED_MAP_ARROW_SCENARIOS - _udfs = _GROUPED_MAP_ARROW_UDFS - params = [list(_GROUPED_MAP_ARROW_SCENARIOS), list(_GROUPED_MAP_ARROW_UDFS)] - param_names = ["scenario", "udf"] + pass class GroupedMapArrowUDFPeakmemBench(_GroupedMapArrowBenchMixin, _PeakmemBenchBase): - _scenarios = _GROUPED_MAP_ARROW_SCENARIOS - _udfs = _GROUPED_MAP_ARROW_UDFS - params = [list(_GROUPED_MAP_ARROW_SCENARIOS), list(_GROUPED_MAP_ARROW_UDFS)] - param_names = ["scenario", "udf"] + pass # -- SQL_GROUPED_MAP_ARROW_ITER_UDF ------------------------------------------ @@ -864,16 +815,17 @@ def _grouped_map_arrow_iter_filter(batches): yield batch.filter(pc.is_valid(batch.column(0))) -_GROUPED_MAP_ARROW_ITER_UDFS = { - "identity_udf": _grouped_map_arrow_iter_identity, - "sort_udf": _grouped_map_arrow_iter_sort, - "filter_udf": _grouped_map_arrow_iter_filter, -} - - -class _GroupedMapArrowIterBenchMixin: +class _GroupedMapArrowIterBenchMixin(_GroupedMapArrowBenchMixin): """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_ITER_UDF.""" + _udfs = { + "identity_udf": _grouped_map_arrow_iter_identity, + "sort_udf": _grouped_map_arrow_iter_sort, + "filter_udf": _grouped_map_arrow_iter_filter, + } + params = [_GroupedMapArrowBenchMixin.params[0], list(_udfs)] + param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): groups, return_type, arg_offsets = self._scenarios[scenario] udf_func = self._udfs[udf_name] @@ -899,17 +851,11 @@ def write_command(b): class GroupedMapArrowIterUDFTimeBench(_GroupedMapArrowIterBenchMixin, _TimeBenchBase): - _scenarios = _GROUPED_MAP_ARROW_SCENARIOS - _udfs = _GROUPED_MAP_ARROW_ITER_UDFS - params = [list(_GROUPED_MAP_ARROW_SCENARIOS), list(_GROUPED_MAP_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] + pass class GroupedMapArrowIterUDFPeakmemBench(_GroupedMapArrowIterBenchMixin, _PeakmemBenchBase): - _scenarios = _GROUPED_MAP_ARROW_SCENARIOS - _udfs = _GROUPED_MAP_ARROW_ITER_UDFS - params = [list(_GROUPED_MAP_ARROW_SCENARIOS), list(_GROUPED_MAP_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] + pass # -- SQL_GROUPED_MAP_PANDAS_UDF ------------------------------------------------ @@ -939,18 +885,6 @@ def _build_grouped_map_pandas_scenarios(): return scenarios -_GROUPED_MAP_PANDAS_SCENARIOS = _build_grouped_map_pandas_scenarios() - -# Each UDF entry: (func, ret_type, n_args). -# ret_type=None means "use the input schema" (excluding key columns for n_args=2). -# n_args=1 -> func(pdf), n_args=2 -> func(key, pdf). -_GROUPED_MAP_PANDAS_UDFS = { - "identity_udf": (lambda df: df, None, 1), - "sort_udf": (lambda df: df.sort_values(df.columns[0]), None, 1), - "key_identity_udf": (lambda key, df: df, None, 2), -} - - class _GroupedMapPandasBenchMixin: """Provides ``_write_scenario`` for SQL_GROUPED_MAP_PANDAS_UDF. @@ -958,6 +892,18 @@ class _GroupedMapPandasBenchMixin: Groups are written as separate Arrow IPC streams. """ + _scenarios = _build_grouped_map_pandas_scenarios() + # Each UDF entry: (func, ret_type, n_args). + # ret_type=None means "use the input schema" (excluding key columns for n_args=2). + # n_args=1 -> func(pdf), n_args=2 -> func(key, pdf). + _udfs = { + "identity_udf": (lambda df: df, None, 1), + "sort_udf": (lambda df: df.sort_values(df.columns[0]), None, 1), + "key_identity_udf": (lambda key, df: df, None, 2), + } + params = [list(_scenarios), list(_udfs)] + param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): batch, num_groups, schema, max_rpb = self._scenarios[scenario] udf_func, ret_type, n_args = self._udfs[udf_name] @@ -981,17 +927,11 @@ def _write_scenario(self, scenario, udf_name, buf): class GroupedMapPandasUDFTimeBench(_GroupedMapPandasBenchMixin, _TimeBenchBase): - _scenarios = _GROUPED_MAP_PANDAS_SCENARIOS - _udfs = _GROUPED_MAP_PANDAS_UDFS - params = [list(_GROUPED_MAP_PANDAS_SCENARIOS), list(_GROUPED_MAP_PANDAS_UDFS)] - param_names = ["scenario", "udf"] + pass class GroupedMapPandasUDFPeakmemBench(_GroupedMapPandasBenchMixin, _PeakmemBenchBase): - _scenarios = _GROUPED_MAP_PANDAS_SCENARIOS - _udfs = _GROUPED_MAP_PANDAS_UDFS - params = [list(_GROUPED_MAP_PANDAS_SCENARIOS), list(_GROUPED_MAP_PANDAS_UDFS)] - param_names = ["scenario", "udf"] + pass # -- SQL_MAP_ARROW_ITER_UDF ------------------------------------------------ @@ -1019,13 +959,6 @@ def _filter_batch_iter(it): yield batch.filter(mask) -_MAP_ARROW_ITER_UDFS = { - "identity_udf": (_identity_batch_iter, None, [0]), - "sort_udf": (_sort_batch_iter, None, [0]), - "filter_udf": (_filter_batch_iter, None, [0]), -} - - def _build_map_arrow_iter_scenarios(): """Build scenarios for SQL_MAP_ARROW_ITER_UDF. @@ -1073,9 +1006,6 @@ def _build_map_arrow_iter_scenarios(): return scenarios -_MAP_ARROW_ITER_SCENARIOS = _build_map_arrow_iter_scenarios() - - def _wrap_batch_in_struct(batch: pa.RecordBatch) -> pa.RecordBatch: """Wrap all columns into a single struct column, mimicking JVM-side encoding.""" struct_array = pa.StructArray.from_arrays(batch.columns, names=batch.schema.names) @@ -1085,10 +1015,19 @@ def _wrap_batch_in_struct(batch: pa.RecordBatch) -> pa.RecordBatch: class _MapArrowIterBenchMixin: """Provides ``_write_scenario`` for SQL_MAP_ARROW_ITER_UDF. - Like ``_NonGroupedBenchMixin`` but wraps input batches in a struct column - to match the JVM-side wire format (``flatten_struct`` undoes this). + Wraps input batches in a struct column to match the JVM-side wire format + (``flatten_struct`` undoes this). """ + _scenarios = _build_map_arrow_iter_scenarios() + _udfs = { + "identity_udf": (_identity_batch_iter, None, [0]), + "sort_udf": (_sort_batch_iter, None, [0]), + "filter_udf": (_filter_batch_iter, None, [0]), + } + params = [list(_scenarios), list(_udfs)] + param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): batch, num_batches, col0_type = self._scenarios[scenario] udf_func, ret_type, arg_offsets = self._udfs[udf_name] @@ -1104,17 +1043,11 @@ def _write_scenario(self, scenario, udf_name, buf): class MapArrowIterUDFTimeBench(_MapArrowIterBenchMixin, _TimeBenchBase): - _scenarios = _MAP_ARROW_ITER_SCENARIOS - _udfs = _MAP_ARROW_ITER_UDFS - params = [list(_MAP_ARROW_ITER_SCENARIOS), list(_MAP_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] + pass class MapArrowIterUDFPeakmemBench(_MapArrowIterBenchMixin, _PeakmemBenchBase): - _scenarios = _MAP_ARROW_ITER_SCENARIOS - _udfs = _MAP_ARROW_ITER_UDFS - params = [list(_MAP_ARROW_ITER_SCENARIOS), list(_MAP_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] + pass # -- SQL_SCALAR_ARROW_UDF --------------------------------------------------- @@ -1134,28 +1067,39 @@ def _nullcheck_arrow(c): return pc.is_valid(c) -# ret_type=None means "use col0_type from the scenario" -_SCALAR_ARROW_UDFS = { - "identity_udf": (lambda c: c, None, [0]), - "sort_udf": (_sort_arrow, None, [0]), - "nullcheck_udf": (_nullcheck_arrow, BooleanType(), [0]), -} +class _ScalarArrowBenchMixin: + """Mixin for SQL_SCALAR_ARROW_UDF benchmarks.""" - -class ScalarArrowUDFTimeBench(_NonGroupedBenchMixin, _TimeBenchBase): _eval_type = PythonEvalType.SQL_SCALAR_ARROW_UDF - _scenarios = _NON_GROUPED_SCENARIOS - _udfs = _SCALAR_ARROW_UDFS - params = [list(_NON_GROUPED_SCENARIOS), list(_SCALAR_ARROW_UDFS)] + _scenarios = _build_non_grouped_scenarios() + # ret_type=None means "use col0_type from the scenario" + _udfs = { + "identity_udf": (lambda c: c, None, [0]), + "sort_udf": (_sort_arrow, None, [0]), + "nullcheck_udf": (_nullcheck_arrow, BooleanType(), [0]), + } + params = [list(_scenarios), list(_udfs)] param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): + batch, num_batches, col0_type = self._scenarios[scenario] + udf_func, ret_type, arg_offsets = self._udfs[udf_name] + if ret_type is None: + ret_type = col0_type + _write_worker_input( + self._eval_type, + lambda b: _build_udf_payload(udf_func, ret_type, arg_offsets, b), + lambda b: _write_arrow_ipc_batches((batch for _ in range(num_batches)), b), + buf, + ) + -class ScalarArrowUDFPeakmemBench(_NonGroupedBenchMixin, _PeakmemBenchBase): - _eval_type = PythonEvalType.SQL_SCALAR_ARROW_UDF - _scenarios = _NON_GROUPED_SCENARIOS - _udfs = _SCALAR_ARROW_UDFS - params = [list(_NON_GROUPED_SCENARIOS), list(_SCALAR_ARROW_UDFS)] - param_names = ["scenario", "udf"] +class ScalarArrowUDFTimeBench(_ScalarArrowBenchMixin, _TimeBenchBase): + pass + + +class ScalarArrowUDFPeakmemBench(_ScalarArrowBenchMixin, _PeakmemBenchBase): + pass # -- SQL_SCALAR_ARROW_ITER_UDF ---------------------------------------------- @@ -1180,24 +1124,22 @@ def _nullcheck_iter(it): yield pc.is_valid(c) -_SCALAR_ARROW_ITER_UDFS = { - "identity_udf": (_identity_iter, None, [0]), - "sort_udf": (_sort_iter, None, [0]), - "nullcheck_udf": (_nullcheck_iter, BooleanType(), [0]), -} +class _ScalarArrowIterBenchMixin(_ScalarArrowBenchMixin): + """Mixin for SQL_SCALAR_ARROW_ITER_UDF benchmarks.""" - -class ScalarArrowIterUDFTimeBench(_NonGroupedBenchMixin, _TimeBenchBase): _eval_type = PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF - _scenarios = _NON_GROUPED_SCENARIOS - _udfs = _SCALAR_ARROW_ITER_UDFS - params = [list(_NON_GROUPED_SCENARIOS), list(_SCALAR_ARROW_ITER_UDFS)] + _udfs = { + "identity_udf": (_identity_iter, None, [0]), + "sort_udf": (_sort_iter, None, [0]), + "nullcheck_udf": (_nullcheck_iter, BooleanType(), [0]), + } + params = [list(_ScalarArrowBenchMixin._scenarios), list(_udfs)] param_names = ["scenario", "udf"] -class ScalarArrowIterUDFPeakmemBench(_NonGroupedBenchMixin, _PeakmemBenchBase): - _eval_type = PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF - _scenarios = _NON_GROUPED_SCENARIOS - _udfs = _SCALAR_ARROW_ITER_UDFS - params = [list(_NON_GROUPED_SCENARIOS), list(_SCALAR_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] +class ScalarArrowIterUDFTimeBench(_ScalarArrowIterBenchMixin, _TimeBenchBase): + pass + + +class ScalarArrowIterUDFPeakmemBench(_ScalarArrowIterBenchMixin, _PeakmemBenchBase): + pass From 6709d3d9e454a37a9e6afc1c66218d9d0d966091 Mon Sep 17 00:00:00 2001 From: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 26 Mar 2026 20:44:32 +0000 Subject: [PATCH 2/4] refactor: extract MockProtocolWriter and MockDataFactory util classes --- python/benchmarks/bench_eval_type.py | 633 ++++++++++++++------------- 1 file changed, 321 insertions(+), 312 deletions(-) diff --git a/python/benchmarks/bench_eval_type.py b/python/benchmarks/bench_eval_type.py index db68d00bfaf0..21871259e71d 100644 --- a/python/benchmarks/bench_eval_type.py +++ b/python/benchmarks/bench_eval_type.py @@ -51,175 +51,297 @@ from pyspark.worker import main as worker_main # --------------------------------------------------------------------------- -# Wire-format helpers +# Mock protocol I/O helpers # --------------------------------------------------------------------------- -def _write_utf8(s: str, buf: io.BytesIO) -> None: - """Write a length-prefixed UTF-8 string (matches ``UTF8Deserializer.loads``).""" - encoded = s.encode("utf-8") - write_int(len(encoded), buf) - buf.write(encoded) - - -def _write_bool(val: bool, buf: io.BytesIO) -> None: - buf.write(struct.pack("!?", val)) +class MockProtocolWriter: + """Constructs the binary wire protocol that ``worker.py``'s ``main()`` expects.""" + + @staticmethod + def write_utf8(s: str, buf: io.BytesIO) -> None: + """Write a length-prefixed UTF-8 string (matches ``UTF8Deserializer.loads``).""" + encoded = s.encode("utf-8") + write_int(len(encoded), buf) + buf.write(encoded) + + @staticmethod + def write_bool(val: bool, buf: io.BytesIO) -> None: + buf.write(struct.pack("!?", val)) + + @staticmethod + def build_preamble(buf: io.BytesIO) -> None: + """Write everything ``main()`` reads before ``eval_type``.""" + write_int(0, buf) # split_index + MockProtocolWriter.write_utf8( + f"{sys.version_info[0]}.{sys.version_info[1]}", buf + ) # python version + MockProtocolWriter.write_utf8( + json.dumps( + { + "isBarrier": False, + "stageId": 0, + "partitionId": 0, + "attemptNumber": 0, + "taskAttemptId": 0, + "cpus": 1, + "resources": {}, + "localProperties": {}, + } + ), + buf, + ) + MockProtocolWriter.write_utf8("/tmp", buf) # spark_files_dir + write_int(0, buf) # num_python_includes + MockProtocolWriter.write_bool(False, buf) # needs_broadcast_decryption_server + write_int(0, buf) # num_broadcast_variables + + @staticmethod + def build_udf_payload( + udf_func: Callable[..., Any], + return_type: StructType, + arg_offsets: list[int], + buf: io.BytesIO, + ) -> None: + """Write the ``read_single_udf`` portion of the protocol.""" + write_int(1, buf) # num_udfs + write_int(len(arg_offsets), buf) # num_arg + for offset in arg_offsets: + write_int(offset, buf) + MockProtocolWriter.write_bool(False, buf) # is_kwarg + write_int(1, buf) # num_chained + command = cloudpickle_dumps((udf_func, return_type)) + write_int(len(command), buf) + buf.write(command) + write_long(0, buf) # result_id + + @staticmethod + def write_arrow_ipc_batches( + batch_iter: Iterator[pa.RecordBatch], buf: io.BufferedIOBase + ) -> None: + """Write a plain Arrow IPC stream from an iterator of Arrow batches.""" + first_batch = next(batch_iter) + writer = pa.RecordBatchStreamWriter(buf, first_batch.schema) + writer.write_batch(first_batch) + for batch in batch_iter: + writer.write_batch(batch) + writer.close() + + @staticmethod + def write_worker_input( + eval_type: int, + write_command: Callable[[io.BufferedIOBase], None], + write_data: Callable[[io.BufferedIOBase], None], + buf: io.BufferedIOBase, + ) -> None: + """Write the full worker binary stream: preamble + command + data + end.""" + MockProtocolWriter.build_preamble(buf) + write_int(eval_type, buf) + write_int(0, buf) # RunnerConf (0 key-value pairs) + write_int(0, buf) # EvalConf (0 key-value pairs) + write_command(buf) + write_data(buf) + write_int(-4, buf) # SpecialLengths.END_OF_STREAM + + @staticmethod + def run_worker_from_replayed_file( + write_input: Callable[[io.BufferedIOBase], None], + ) -> None: + """Write input to a temp file, then replay it through ``worker_main``.""" + fd, path = tempfile.mkstemp(prefix="spark-bench-replay-", suffix=".bin") + try: + with os.fdopen(fd, "w+b") as infile: + write_input(infile) + infile.flush() + infile.seek(0) + worker_main(infile, io.BytesIO()) + finally: + try: + os.remove(path) + except FileNotFoundError: + pass + + @staticmethod + def write_grouped_arrow_data( + groups: list[tuple[pa.RecordBatch, ...]], + num_dfs: int, + buf: io.BufferedIOBase, + max_records_per_batch: int | None = None, + ) -> None: + """Write grouped Arrow data in the wire protocol expected by _load_group_dataframes.""" + for group in groups: + write_int(num_dfs, buf) + for df_batch in group: + if max_records_per_batch and df_batch.num_rows > max_records_per_batch: + sub_batches = [ + df_batch.slice(offset, max_records_per_batch) + for offset in range(0, df_batch.num_rows, max_records_per_batch) + ] + MockProtocolWriter.write_arrow_ipc_batches(iter(sub_batches), buf) + else: + MockProtocolWriter.write_arrow_ipc_batches(iter([df_batch]), buf) + write_int(0, buf) # end of groups + + @staticmethod + def build_grouped_arg_offsets(n_cols, n_keys=0): + """``[len, num_keys, key_col_0, ..., val_col_0, ...]``""" + keys = list(range(n_keys)) + vals = list(range(n_keys, n_cols)) + offsets = [n_keys] + keys + vals + return [len(offsets)] + offsets + + @staticmethod + def make_grouped_arg_offsets(num_key_cols: int, num_value_cols: int) -> list[int]: + """Build arg_offsets for grouped map UDFs. + + Format: [total_len, num_keys, key_idx..., value_idx...] + All columns are in a single struct, so key indexes come first, + then value indexes. + """ + total = num_key_cols + num_value_cols + 1 # +1 for the num_keys prefix + key_idxs = list(range(num_key_cols)) + value_idxs = list(range(num_key_cols, num_key_cols + num_value_cols)) + return [total, num_key_cols] + key_idxs + value_idxs # --------------------------------------------------------------------------- -# Worker protocol builder +# Mock data payload helpers # --------------------------------------------------------------------------- -def _build_preamble(buf: io.BytesIO) -> None: - """Write everything ``main()`` reads before ``eval_type``.""" - write_int(0, buf) # split_index - _write_utf8(f"{sys.version_info[0]}.{sys.version_info[1]}", buf) # python version - _write_utf8( - json.dumps( - { - "isBarrier": False, - "stageId": 0, - "partitionId": 0, - "attemptNumber": 0, - "taskAttemptId": 0, - "cpus": 1, - "resources": {}, - "localProperties": {}, - } - ), - buf, - ) - _write_utf8("/tmp", buf) # spark_files_dir - write_int(0, buf) # num_python_includes - _write_bool(False, buf) # needs_broadcast_decryption_server - write_int(0, buf) # num_broadcast_variables - - -def _build_udf_payload( - udf_func: Callable[..., Any], - return_type: StructType, - arg_offsets: list[int], - buf: io.BytesIO, -) -> None: - """Write the ``read_single_udf`` portion of the protocol.""" - write_int(1, buf) # num_udfs - write_int(len(arg_offsets), buf) # num_arg - for offset in arg_offsets: - write_int(offset, buf) - _write_bool(False, buf) # is_kwarg - write_int(1, buf) # num_chained - command = cloudpickle_dumps((udf_func, return_type)) - write_int(len(command), buf) - buf.write(command) - write_long(0, buf) # result_id - - -def _write_arrow_ipc_batches(batch_iter: Iterator[pa.RecordBatch], buf: io.BufferedIOBase) -> None: - """Write a plain Arrow IPC stream from an iterator of Arrow batches.""" - first_batch = next(batch_iter) - writer = pa.RecordBatchStreamWriter(buf, first_batch.schema) - writer.write_batch(first_batch) - for batch in batch_iter: - writer.write_batch(batch) - writer.close() - - -def _write_worker_input( - eval_type: int, - write_command: Callable[[io.BufferedIOBase], None], - write_data: Callable[[io.BufferedIOBase], None], - buf: io.BufferedIOBase, -) -> None: - """Write the full worker binary stream: preamble + command + data + end. - - This is the general skeleton shared by all eval types. Callers provide - *write_command* (e.g. ``_build_udf_payload``) and *write_data* - (e.g. ``_write_arrow_ipc_batches``) to plug in protocol specifics. - """ - _build_preamble(buf) - write_int(eval_type, buf) - write_int(0, buf) # RunnerConf (0 key-value pairs) - write_int(0, buf) # EvalConf (0 key-value pairs) - write_command(buf) - write_data(buf) - write_int(-4, buf) # SpecialLengths.END_OF_STREAM - - -def _run_worker_from_replayed_file(write_input: Callable[[io.BufferedIOBase], None]) -> None: - """Write input to a temp file, then replay it through ``worker_main``.""" - fd, path = tempfile.mkstemp(prefix="spark-bench-replay-", suffix=".bin") - try: - with os.fdopen(fd, "w+b") as infile: - write_input(infile) - infile.flush() - infile.seek(0) - worker_main(infile, io.BytesIO()) - finally: - try: - os.remove(path) - except FileNotFoundError: - pass - - -def _make_typed_batch(rows: int, n_cols: int) -> tuple[pa.RecordBatch, IntegerType]: - """Columns cycling through int64, string, binary, boolean — reflects realistic serde costs.""" - type_cycle = [ - (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), IntegerType()), - (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), - (lambda r: pa.array([f"b{j}".encode() for j in range(r)]), BinaryType()), - (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), - ] - arrays = [type_cycle[i % len(type_cycle)][0](rows) for i in range(n_cols)] - fields = [StructField(f"col_{i}", type_cycle[i % len(type_cycle)][1]) for i in range(n_cols)] - return ( - pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]), - IntegerType(), - ) +class MockDataFactory: + """Creates mock Arrow batches and group structures for benchmarks.""" + + @staticmethod + def make_typed_batch(rows: int, n_cols: int) -> tuple[pa.RecordBatch, IntegerType]: + """Columns cycling through int64, string, binary, boolean.""" + type_cycle = [ + (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), IntegerType()), + (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), + (lambda r: pa.array([f"b{j}".encode() for j in range(r)]), BinaryType()), + (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), + ] + arrays = [type_cycle[i % len(type_cycle)][0](rows) for i in range(n_cols)] + fields = [ + StructField(f"col_{i}", type_cycle[i % len(type_cycle)][1]) for i in range(n_cols) + ] + return ( + pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]), + IntegerType(), + ) + @staticmethod + def make_grouped_batch(rows_per_group, n_cols): + """``group_key (int64)`` + ``(n_cols - 1)`` float32 value columns.""" + arrays = [pa.array(np.zeros(rows_per_group, dtype=np.int64))] + [ + pa.array(np.random.rand(rows_per_group).astype(np.float32)) + for _ in range(n_cols - 1) + ] + fields = [StructField("group_key", IntegerType())] + [ + StructField(f"val_{i}", DoubleType()) for i in range(n_cols - 1) + ] + return ( + pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]), + StructType(fields), + ) -def _make_grouped_batch(rows_per_group, n_cols): - """``group_key (int64)`` + ``(n_cols - 1)`` float32 value columns.""" - arrays = [pa.array(np.zeros(rows_per_group, dtype=np.int64))] + [ - pa.array(np.random.rand(rows_per_group).astype(np.float32)) for _ in range(n_cols - 1) - ] - fields = [StructField("group_key", IntegerType())] + [ - StructField(f"val_{i}", DoubleType()) for i in range(n_cols - 1) - ] - return ( - pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]), - StructType(fields), - ) + @staticmethod + def make_mixed_batch(rows_per_group): + """``id``, ``str_col``, ``float_col``, ``double_col``, ``long_col``.""" + arrays = [ + pa.array(np.zeros(rows_per_group, dtype=np.int64)), + pa.array([f"s{j}" for j in range(rows_per_group)]), + pa.array(np.random.rand(rows_per_group).astype(np.float32)), + pa.array(np.random.rand(rows_per_group)), + pa.array(np.zeros(rows_per_group, dtype=np.int64)), + ] + fields = [ + StructField("id", IntegerType()), + StructField("str_col", StringType()), + StructField("float_col", DoubleType()), + StructField("double_col", DoubleType()), + StructField("long_col", IntegerType()), + ] + return ( + pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]), + StructType(fields), + ) + @staticmethod + def make_pure_batch(rows, n_cols, make_array, spark_type): + """Create a batch where all columns share the same Arrow type.""" + arrays = [make_array(rows) for _ in range(n_cols)] + fields = [StructField(f"col_{i}", spark_type) for i in range(n_cols)] + return pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]) + + @staticmethod + def make_grouped_batches( + num_groups: int, + rows_per_group: int, + num_key_cols: int, + num_value_cols: int, + ) -> tuple[list[tuple[pa.RecordBatch, ...]], StructType]: + """Create grouped data: each group is a single batch wrapped in a struct column. + + Returns (groups, return_type) where each group is a 1-tuple of a + struct-wrapped RecordBatch suitable for ArrowStreamGroupUDFSerializer. + """ + n_cols = num_key_cols + num_value_cols + type_cycle = [ + (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), LongType()), + (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), + (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), + (lambda r: pa.array(np.random.rand(r)), DoubleType()), + ] + + groups = [] + for g in range(num_groups): + arrays = [ + type_cycle[i % len(type_cycle)][0](rows_per_group) for i in range(n_cols) + ] + names = [f"col_{i}" for i in range(n_cols)] + batch = pa.RecordBatch.from_arrays(arrays, names=names) + # Wrap in struct to match JVM-side encoding (flatten_struct undoes this) + struct_array = pa.StructArray.from_arrays(batch.columns, names=names) + wrapped = pa.RecordBatch.from_arrays([struct_array], names=["_0"]) + groups.append((wrapped,)) + + value_fields = [ + StructField(f"col_{i}", type_cycle[i % len(type_cycle)][1]) + for i in range(num_key_cols, n_cols) + ] + return_type = StructType(value_fields) + + return groups, return_type + + @staticmethod + def make_agg_arrow_groups( + num_groups: int, + rows_per_group: int, + n_cols: int, + ) -> list[tuple[pa.RecordBatch, ...]]: + """Create grouped data for agg Arrow UDFs (plain batches, not struct-wrapped).""" + type_cycle = [ + lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), + lambda r: pa.array(np.random.rand(r)), + lambda r: pa.array(np.random.randint(0, 100, r, dtype=np.int64)), + lambda r: pa.array(np.random.rand(r)), + ] -def _make_mixed_batch(rows_per_group): - """``id``, ``str_col``, ``float_col``, ``double_col``, ``long_col``.""" - arrays = [ - pa.array(np.zeros(rows_per_group, dtype=np.int64)), - pa.array([f"s{j}" for j in range(rows_per_group)]), - pa.array(np.random.rand(rows_per_group).astype(np.float32)), - pa.array(np.random.rand(rows_per_group)), - pa.array(np.zeros(rows_per_group, dtype=np.int64)), - ] - fields = [ - StructField("id", IntegerType()), - StructField("str_col", StringType()), - StructField("float_col", DoubleType()), - StructField("double_col", DoubleType()), - StructField("long_col", IntegerType()), - ] - return ( - pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]), - StructType(fields), - ) + groups = [] + for _ in range(num_groups): + arrays = [type_cycle[i % len(type_cycle)](rows_per_group) for i in range(n_cols)] + names = [f"col_{i}" for i in range(n_cols)] + batch = pa.RecordBatch.from_arrays(arrays, names=names) + groups.append((batch,)) + return groups -def _build_grouped_arg_offsets(n_cols, n_keys=0): - """``[len, num_keys, key_col_0, ..., val_col_0, ...]``""" - keys = list(range(n_keys)) - vals = list(range(n_keys, n_cols)) - offsets = [n_keys] + keys + vals - return [len(offsets)] + offsets + @staticmethod + def wrap_batch_in_struct(batch: pa.RecordBatch) -> pa.RecordBatch: + """Wrap all columns into a single struct column, mimicking JVM-side encoding.""" + struct_array = pa.StructArray.from_arrays(batch.columns, names=batch.schema.names) + return pa.RecordBatch.from_arrays([struct_array], names=["_0"]) # --------------------------------------------------------------------------- @@ -258,7 +380,7 @@ def setup(self, *args): self._args = args def peakmem_worker(self, *args): - _run_worker_from_replayed_file(lambda buf: self._write_scenario(*self._args, buf)) + MockProtocolWriter.run_worker_from_replayed_file(lambda buf: self._write_scenario(*self._args, buf)) # --------------------------------------------------------------------------- @@ -272,13 +394,6 @@ def peakmem_worker(self, *args): # type when the UDF declaration leaves it as ``None``. -def _make_pure_batch(rows, n_cols, make_array, spark_type): - """Create a batch where all columns share the same Arrow type.""" - arrays = [make_array(rows) for _ in range(n_cols)] - fields = [StructField(f"col_{i}", spark_type) for i in range(n_cols)] - return pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]) - - def _build_non_grouped_scenarios(): """Build data-shape scenarios for non-grouped Arrow eval types. @@ -293,14 +408,14 @@ def _build_non_grouped_scenarios(): "lg_batch_few_col": (10_000, 5, 3_500), "lg_batch_many_col": (10_000, 50, 400), }.items(): - batch, col0_type = _make_typed_batch(rows, n_cols) + batch, col0_type = MockDataFactory.make_typed_batch(rows, n_cols) scenarios[name] = (batch, num_batches, col0_type) # Pure-type scenarios (5000 rows, 10 cols, 1000 batches) _PURE_ROWS, _PURE_COLS, _PURE_BATCHES = 5_000, 10, 1_000 scenarios["pure_ints"] = ( - _make_pure_batch( + MockDataFactory.make_pure_batch( _PURE_ROWS, _PURE_COLS, lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), @@ -310,7 +425,7 @@ def _build_non_grouped_scenarios(): IntegerType(), ) scenarios["pure_floats"] = ( - _make_pure_batch( + MockDataFactory.make_pure_batch( _PURE_ROWS, _PURE_COLS, lambda r: pa.array(np.random.rand(r)), @@ -320,7 +435,7 @@ def _build_non_grouped_scenarios(): DoubleType(), ) scenarios["pure_strings"] = ( - _make_pure_batch( + MockDataFactory.make_pure_batch( _PURE_ROWS, _PURE_COLS, lambda r: pa.array([f"s{j}" for j in range(r)]), @@ -330,7 +445,7 @@ def _build_non_grouped_scenarios(): StringType(), ) scenarios["pure_ts"] = ( - _make_pure_batch( + MockDataFactory.make_pure_batch( _PURE_ROWS, _PURE_COLS, lambda r: pa.array( @@ -342,7 +457,7 @@ def _build_non_grouped_scenarios(): TimestampNTZType(), ) scenarios["mixed_types"] = ( - _make_typed_batch(_PURE_ROWS, _PURE_COLS)[0], + MockDataFactory.make_typed_batch(_PURE_ROWS, _PURE_COLS)[0], _PURE_BATCHES, IntegerType(), ) @@ -374,7 +489,7 @@ def _build_arrow_batched_scenarios(): "lg_batch_few_col": (10_000, 5, 5), "lg_batch_many_col": (10_000, 50, 2), }.items(): - batch, col0_type = _make_typed_batch(rows, n_cols) + batch, col0_type = MockDataFactory.make_typed_batch(rows, n_cols) type_cycle = [IntegerType(), StringType(), BinaryType(), BooleanType()] input_struct = StructType( [StructField(f"col_{i}", type_cycle[i % len(type_cycle)]) for i in range(n_cols)] @@ -392,12 +507,12 @@ def _build_arrow_batched_scenarios(): ("pure_floats", lambda r: pa.array(np.random.rand(r)), DoubleType()), ("pure_strings", lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), ]: - batch = _make_pure_batch(_PURE_ROWS, _PURE_COLS, make_array, spark_type) + batch = MockDataFactory.make_pure_batch(_PURE_ROWS, _PURE_COLS, make_array, spark_type) input_struct = StructType([StructField(f"col_{i}", spark_type) for i in range(_PURE_COLS)]) scenarios[scenario_name] = (batch, _PURE_BATCHES, input_struct, spark_type) # mixed types - batch, col0_type = _make_typed_batch(_PURE_ROWS, _PURE_COLS) + batch, col0_type = MockDataFactory.make_typed_batch(_PURE_ROWS, _PURE_COLS) type_cycle = [IntegerType(), StringType(), BinaryType(), BooleanType()] input_struct = StructType( [StructField(f"col_{i}", type_cycle[i % len(type_cycle)]) for i in range(_PURE_COLS)] @@ -433,13 +548,13 @@ def _write_scenario(self, scenario, udf_name, buf): def write_command(b): # input_type is read before UDF payloads for ARROW_BATCHED_UDF - _write_utf8(input_struct.json(), b) - _build_udf_payload(udf_func, ret_type, arg_offsets, b) + MockProtocolWriter.write_utf8(input_struct.json(), b) + MockProtocolWriter.build_udf_payload(udf_func, ret_type, arg_offsets, b) - _write_worker_input( + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_ARROW_BATCHED_UDF, write_command, - lambda b: _write_arrow_ipc_batches((batch for _ in range(num_batches)), b), + lambda b: MockProtocolWriter.write_arrow_ipc_batches((batch for _ in range(num_batches)), b), buf, ) @@ -452,83 +567,6 @@ class ArrowBatchedUDFPeakmemBench(_ArrowBatchedBenchMixin, _PeakmemBenchBase): pass -# --------------------------------------------------------------------------- -# Grouped Arrow UDF helpers -# --------------------------------------------------------------------------- - - -def _write_grouped_arrow_data( - groups: list[tuple[pa.RecordBatch, ...]], - num_dfs: int, - buf: io.BufferedIOBase, - max_records_per_batch: int | None = None, -) -> None: - """Write grouped Arrow data in the wire protocol expected by _load_group_dataframes.""" - for group in groups: - write_int(num_dfs, buf) - for df_batch in group: - if max_records_per_batch and df_batch.num_rows > max_records_per_batch: - sub_batches = [ - df_batch.slice(offset, max_records_per_batch) - for offset in range(0, df_batch.num_rows, max_records_per_batch) - ] - _write_arrow_ipc_batches(iter(sub_batches), buf) - else: - _write_arrow_ipc_batches(iter([df_batch]), buf) - write_int(0, buf) # end of groups - - -def _make_grouped_arg_offsets(num_key_cols: int, num_value_cols: int) -> list[int]: - """Build arg_offsets for grouped map UDFs. - - Format: [total_len, num_keys, key_idx..., value_idx...] - All columns are in a single struct, so key indexes come first, - then value indexes. - """ - total = num_key_cols + num_value_cols + 1 # +1 for the num_keys prefix - key_idxs = list(range(num_key_cols)) - value_idxs = list(range(num_key_cols, num_key_cols + num_value_cols)) - return [total, num_key_cols] + key_idxs + value_idxs - - -def _make_grouped_batches( - num_groups: int, - rows_per_group: int, - num_key_cols: int, - num_value_cols: int, -) -> tuple[list[tuple[pa.RecordBatch, ...]], StructType]: - """Create grouped data: each group is a single batch wrapped in a struct column. - - Returns (groups, return_type) where each group is a 1-tuple of a - struct-wrapped RecordBatch suitable for ArrowStreamGroupUDFSerializer. - """ - n_cols = num_key_cols + num_value_cols - type_cycle = [ - (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), LongType()), - (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), - (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), - (lambda r: pa.array(np.random.rand(r)), DoubleType()), - ] - - groups = [] - for g in range(num_groups): - arrays = [type_cycle[i % len(type_cycle)][0](rows_per_group) for i in range(n_cols)] - names = [f"col_{i}" for i in range(n_cols)] - batch = pa.RecordBatch.from_arrays(arrays, names=names) - # Wrap in struct to match JVM-side encoding (flatten_struct undoes this) - struct_array = pa.StructArray.from_arrays(batch.columns, names=names) - wrapped = pa.RecordBatch.from_arrays([struct_array], names=["_0"]) - groups.append((wrapped,)) - - value_fields = [ - StructField(f"col_{i}", type_cycle[i % len(type_cycle)][1]) - for i in range(num_key_cols, n_cols) - ] - return_type = StructType(value_fields) - - return groups, return_type - - # -- SQL_GROUPED_AGG_ARROW_UDF ------------------------------------------------ # UDF receives pa.Array columns per group, returns scalar. @@ -572,29 +610,6 @@ def _grouped_agg_arrow_iter_mean_multi(batch_iter): return total -def _make_agg_arrow_groups( - num_groups: int, - rows_per_group: int, - n_cols: int, -) -> list[tuple[pa.RecordBatch, ...]]: - """Create grouped data for agg Arrow UDFs (plain batches, not struct-wrapped).""" - type_cycle = [ - lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), - lambda r: pa.array(np.random.rand(r)), - lambda r: pa.array(np.random.randint(0, 100, r, dtype=np.int64)), - lambda r: pa.array(np.random.rand(r)), - ] - - groups = [] - for _ in range(num_groups): - arrays = [type_cycle[i % len(type_cycle)](rows_per_group) for i in range(n_cols)] - names = [f"col_{i}" for i in range(n_cols)] - batch = pa.RecordBatch.from_arrays(arrays, names=names) - groups.append((batch,)) - - return groups - - def _build_grouped_agg_arrow_scenarios(): """Build scenarios for SQL_GROUPED_AGG_ARROW_UDF / AGG_ARROW_ITER_UDF. @@ -610,7 +625,7 @@ def _build_grouped_agg_arrow_scenarios(): "many_groups_lg": (500, 10_000, 5), "wide_cols": (200, 5_000, 20), }.items(): - groups = _make_agg_arrow_groups(num_groups, rows_per_group, n_cols) + groups = MockDataFactory.make_agg_arrow_groups(num_groups, rows_per_group, n_cols) scenarios[name] = (groups, n_cols) return scenarios @@ -640,12 +655,12 @@ def _write_scenario(self, scenario, udf_name, buf): return_type = DoubleType() def write_command(b): - _build_udf_payload(udf_func, return_type, arg_offsets, b) + MockProtocolWriter.build_udf_payload(udf_func, return_type, arg_offsets, b) - _write_worker_input( + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF, write_command, - lambda b: _write_grouped_arrow_data(groups, num_dfs=1, buf=b), + lambda b: MockProtocolWriter.write_grouped_arrow_data(groups, num_dfs=1, buf=b), buf, ) @@ -681,12 +696,12 @@ def _write_scenario(self, scenario, udf_name, buf): return_type = DoubleType() def write_command(b): - _build_udf_payload(udf_func, return_type, arg_offsets, b) + MockProtocolWriter.build_udf_payload(udf_func, return_type, arg_offsets, b) - _write_worker_input( + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF, write_command, - lambda b: _write_grouped_arrow_data(groups, num_dfs=1, buf=b), + lambda b: MockProtocolWriter.write_grouped_arrow_data(groups, num_dfs=1, buf=b), buf, ) @@ -736,10 +751,10 @@ def _build_grouped_map_arrow_scenarios(): "wide_values": (200, 5_000, 1, 20), "multi_key": (200, 5_000, 3, 5), }.items(): - groups, return_type = _make_grouped_batches( + groups, return_type = MockDataFactory.make_grouped_batches( num_groups, rows_per_group, num_key_cols, num_value_cols ) - arg_offsets = _make_grouped_arg_offsets(num_key_cols, num_value_cols) + arg_offsets = MockProtocolWriter.make_grouped_arg_offsets(num_key_cols, num_value_cols) scenarios[name] = (groups, return_type, arg_offsets) return scenarios @@ -767,17 +782,17 @@ def write_command(b): write_int(len(arg_offsets), b) # num_arg for offset in arg_offsets: write_int(offset, b) - _write_bool(False, b) # is_kwarg + MockProtocolWriter.write_bool(False, b) # is_kwarg write_int(1, b) # num_chained command = cloudpickle_dumps((udf_func, return_type)) write_int(len(command), b) b.write(command) write_long(0, b) # result_id - _write_worker_input( + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF, write_command, - lambda b: _write_grouped_arrow_data(groups, num_dfs=1, buf=b), + lambda b: MockProtocolWriter.write_grouped_arrow_data(groups, num_dfs=1, buf=b), buf, ) @@ -835,17 +850,17 @@ def write_command(b): write_int(len(arg_offsets), b) # num_arg for offset in arg_offsets: write_int(offset, b) - _write_bool(False, b) # is_kwarg + MockProtocolWriter.write_bool(False, b) # is_kwarg write_int(1, b) # num_chained command = cloudpickle_dumps((udf_func, return_type)) write_int(len(command), b) b.write(command) write_long(0, b) # result_id - _write_worker_input( + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF, write_command, - lambda b: _write_grouped_arrow_data(groups, num_dfs=1, buf=b), + lambda b: MockProtocolWriter.write_grouped_arrow_data(groups, num_dfs=1, buf=b), buf, ) @@ -875,11 +890,11 @@ def _build_grouped_map_pandas_scenarios(): "lg_grp_few_col": (100_000, 5, 30, _MAX_RECORDS_PER_BATCH), "lg_grp_many_col": (100_000, 50, 5, _MAX_RECORDS_PER_BATCH), }.items(): - batch, schema = _make_grouped_batch(rows, n_cols) + batch, schema = MockDataFactory.make_grouped_batch(rows, n_cols) scenarios[name] = (batch, num_groups, schema, max_rpb) # mixed column types, small groups - batch, schema = _make_mixed_batch(3) + batch, schema = MockDataFactory.make_mixed_batch(3) scenarios["mixed_types"] = (batch, 200, schema, None) return scenarios @@ -912,11 +927,11 @@ def _write_scenario(self, scenario, udf_name, buf): # so the return schema must also exclude them. ret_type = StructType(schema.fields[n_args - 1 :]) if n_args > 1 else schema n_cols = len(schema.fields) - arg_offsets = _build_grouped_arg_offsets(n_cols, n_keys=n_args - 1) - _write_worker_input( + arg_offsets = MockProtocolWriter.build_grouped_arg_offsets(n_cols, n_keys=n_args - 1) + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, - lambda b: _build_udf_payload(udf_func, ret_type, arg_offsets, b), - lambda b: _write_grouped_arrow_data( + lambda b: MockProtocolWriter.build_udf_payload(udf_func, ret_type, arg_offsets, b), + lambda b: MockProtocolWriter.write_grouped_arrow_data( [(batch,)] * num_groups, num_dfs=1, buf=b, @@ -973,7 +988,7 @@ def _build_map_arrow_iter_scenarios(): "lg_batch_few_col": (10_000, 5, 500), "lg_batch_many_col": (10_000, 50, 50), }.items(): - batch, col0_type = _make_typed_batch(rows, n_cols) + batch, col0_type = MockDataFactory.make_typed_batch(rows, n_cols) scenarios[name] = (batch, num_batches, col0_type) _PURE_ROWS, _PURE_COLS, _PURE_BATCHES = 5_000, 10, 200 @@ -994,11 +1009,11 @@ def _build_map_arrow_iter_scenarios(): TimestampNTZType(), ), ]: - batch = _make_pure_batch(_PURE_ROWS, _PURE_COLS, make_array, spark_type) + batch = MockDataFactory.make_pure_batch(_PURE_ROWS, _PURE_COLS, make_array, spark_type) scenarios[scenario_name] = (batch, _PURE_BATCHES, spark_type) scenarios["mixed_types"] = ( - _make_typed_batch(_PURE_ROWS, _PURE_COLS)[0], + MockDataFactory.make_typed_batch(_PURE_ROWS, _PURE_COLS)[0], _PURE_BATCHES, IntegerType(), ) @@ -1006,12 +1021,6 @@ def _build_map_arrow_iter_scenarios(): return scenarios -def _wrap_batch_in_struct(batch: pa.RecordBatch) -> pa.RecordBatch: - """Wrap all columns into a single struct column, mimicking JVM-side encoding.""" - struct_array = pa.StructArray.from_arrays(batch.columns, names=batch.schema.names) - return pa.RecordBatch.from_arrays([struct_array], names=["_0"]) - - class _MapArrowIterBenchMixin: """Provides ``_write_scenario`` for SQL_MAP_ARROW_ITER_UDF. @@ -1033,11 +1042,11 @@ def _write_scenario(self, scenario, udf_name, buf): udf_func, ret_type, arg_offsets = self._udfs[udf_name] if ret_type is None: ret_type = col0_type - wrapped = _wrap_batch_in_struct(batch) - _write_worker_input( + wrapped = MockDataFactory.wrap_batch_in_struct(batch) + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_MAP_ARROW_ITER_UDF, - lambda b: _build_udf_payload(udf_func, ret_type, arg_offsets, b), - lambda b: _write_arrow_ipc_batches((wrapped for _ in range(num_batches)), b), + lambda b: MockProtocolWriter.build_udf_payload(udf_func, ret_type, arg_offsets, b), + lambda b: MockProtocolWriter.write_arrow_ipc_batches((wrapped for _ in range(num_batches)), b), buf, ) @@ -1086,10 +1095,10 @@ def _write_scenario(self, scenario, udf_name, buf): udf_func, ret_type, arg_offsets = self._udfs[udf_name] if ret_type is None: ret_type = col0_type - _write_worker_input( + MockProtocolWriter.write_worker_input( self._eval_type, - lambda b: _build_udf_payload(udf_func, ret_type, arg_offsets, b), - lambda b: _write_arrow_ipc_batches((batch for _ in range(num_batches)), b), + lambda b: MockProtocolWriter.build_udf_payload(udf_func, ret_type, arg_offsets, b), + lambda b: MockProtocolWriter.write_arrow_ipc_batches((batch for _ in range(num_batches)), b), buf, ) From 49a977196c730ccdb1ee035a2eac60cbbc4c09d7 Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 26 Mar 2026 17:17:30 -0700 Subject: [PATCH 3/4] refactor: unify MockDataFactory API with keyword args, batch_size default, and struct type support --- python/benchmarks/bench_eval_type.py | 1202 +++++++++++++------------- 1 file changed, 577 insertions(+), 625 deletions(-) diff --git a/python/benchmarks/bench_eval_type.py b/python/benchmarks/bench_eval_type.py index 21871259e71d..091fd7868374 100644 --- a/python/benchmarks/bench_eval_type.py +++ b/python/benchmarks/bench_eval_type.py @@ -34,6 +34,8 @@ import numpy as np import pyarrow as pa +np.random.seed(42) + from pyspark.cloudpickle import dumps as cloudpickle_dumps from pyspark.serializers import write_int, write_long from pyspark.sql.types import ( @@ -41,7 +43,6 @@ BooleanType, DoubleType, IntegerType, - LongType, StringType, StructField, StructType, @@ -50,33 +51,51 @@ from pyspark.util import PythonEvalType from pyspark.worker import main as worker_main + # --------------------------------------------------------------------------- -# Mock protocol I/O helpers +# Mock helpers: protocol writer, data factory, UDF factory # --------------------------------------------------------------------------- class MockProtocolWriter: """Constructs the binary wire protocol that ``worker.py``'s ``main()`` expects.""" - @staticmethod - def write_utf8(s: str, buf: io.BytesIO) -> None: - """Write a length-prefixed UTF-8 string (matches ``UTF8Deserializer.loads``).""" - encoded = s.encode("utf-8") - write_int(len(encoded), buf) - buf.write(encoded) - - @staticmethod - def write_bool(val: bool, buf: io.BytesIO) -> None: + @classmethod + def write_bool(cls, val: bool, buf: io.BytesIO) -> None: buf.write(struct.pack("!?", val)) - @staticmethod - def build_preamble(buf: io.BytesIO) -> None: + @classmethod + def write_data_payload( + cls, batch_iter: Iterator[pa.RecordBatch], buf: io.BufferedIOBase + ) -> None: + """Write a plain Arrow IPC stream from an iterator of Arrow batches.""" + first_batch = next(batch_iter) + writer = pa.RecordBatchStreamWriter(buf, first_batch.schema) + writer.write_batch(first_batch) + for batch in batch_iter: + writer.write_batch(batch) + writer.close() + + @classmethod + def write_grouped_data_payload( + cls, + groups: list[tuple[pa.RecordBatch, ...]], + num_dfs: int, + buf: io.BufferedIOBase, + ) -> None: + """Write grouped Arrow data in the wire protocol expected by _load_group_dataframes.""" + for group in groups: + write_int(num_dfs, buf) + for df_batch in group: + cls.write_data_payload(iter([df_batch]), buf) + write_int(0, buf) # end of groups + + @classmethod + def write_preamble(cls, buf: io.BytesIO) -> None: """Write everything ``main()`` reads before ``eval_type``.""" write_int(0, buf) # split_index - MockProtocolWriter.write_utf8( - f"{sys.version_info[0]}.{sys.version_info[1]}", buf - ) # python version - MockProtocolWriter.write_utf8( + cls.write_utf8(f"{sys.version_info[0]}.{sys.version_info[1]}", buf) # python version + cls.write_utf8( json.dumps( { "isBarrier": False, @@ -91,13 +110,14 @@ def build_preamble(buf: io.BytesIO) -> None: ), buf, ) - MockProtocolWriter.write_utf8("/tmp", buf) # spark_files_dir + cls.write_utf8("/tmp", buf) # spark_files_dir write_int(0, buf) # num_python_includes - MockProtocolWriter.write_bool(False, buf) # needs_broadcast_decryption_server + cls.write_bool(False, buf) # needs_broadcast_decryption_server write_int(0, buf) # num_broadcast_variables - @staticmethod - def build_udf_payload( + @classmethod + def write_udf_payload( + cls, udf_func: Callable[..., Any], return_type: StructType, arg_offsets: list[int], @@ -108,240 +128,145 @@ def build_udf_payload( write_int(len(arg_offsets), buf) # num_arg for offset in arg_offsets: write_int(offset, buf) - MockProtocolWriter.write_bool(False, buf) # is_kwarg + cls.write_bool(False, buf) # is_kwarg write_int(1, buf) # num_chained command = cloudpickle_dumps((udf_func, return_type)) write_int(len(command), buf) buf.write(command) write_long(0, buf) # result_id - @staticmethod - def write_arrow_ipc_batches( - batch_iter: Iterator[pa.RecordBatch], buf: io.BufferedIOBase - ) -> None: - """Write a plain Arrow IPC stream from an iterator of Arrow batches.""" - first_batch = next(batch_iter) - writer = pa.RecordBatchStreamWriter(buf, first_batch.schema) - writer.write_batch(first_batch) - for batch in batch_iter: - writer.write_batch(batch) - writer.close() + @classmethod + def write_utf8(cls, s: str, buf: io.BytesIO) -> None: + """Write a length-prefixed UTF-8 string (matches ``UTF8Deserializer.loads``).""" + encoded = s.encode("utf-8") + write_int(len(encoded), buf) + buf.write(encoded) - @staticmethod + @classmethod def write_worker_input( + cls, eval_type: int, - write_command: Callable[[io.BufferedIOBase], None], + write_udf: Callable[[io.BufferedIOBase], None], write_data: Callable[[io.BufferedIOBase], None], buf: io.BufferedIOBase, ) -> None: """Write the full worker binary stream: preamble + command + data + end.""" - MockProtocolWriter.build_preamble(buf) + cls.write_preamble(buf) write_int(eval_type, buf) write_int(0, buf) # RunnerConf (0 key-value pairs) write_int(0, buf) # EvalConf (0 key-value pairs) - write_command(buf) + write_udf(buf) write_data(buf) write_int(-4, buf) # SpecialLengths.END_OF_STREAM - @staticmethod - def run_worker_from_replayed_file( - write_input: Callable[[io.BufferedIOBase], None], - ) -> None: - """Write input to a temp file, then replay it through ``worker_main``.""" - fd, path = tempfile.mkstemp(prefix="spark-bench-replay-", suffix=".bin") - try: - with os.fdopen(fd, "w+b") as infile: - write_input(infile) - infile.flush() - infile.seek(0) - worker_main(infile, io.BytesIO()) - finally: - try: - os.remove(path) - except FileNotFoundError: - pass - - @staticmethod - def write_grouped_arrow_data( - groups: list[tuple[pa.RecordBatch, ...]], - num_dfs: int, - buf: io.BufferedIOBase, - max_records_per_batch: int | None = None, - ) -> None: - """Write grouped Arrow data in the wire protocol expected by _load_group_dataframes.""" - for group in groups: - write_int(num_dfs, buf) - for df_batch in group: - if max_records_per_batch and df_batch.num_rows > max_records_per_batch: - sub_batches = [ - df_batch.slice(offset, max_records_per_batch) - for offset in range(0, df_batch.num_rows, max_records_per_batch) - ] - MockProtocolWriter.write_arrow_ipc_batches(iter(sub_batches), buf) - else: - MockProtocolWriter.write_arrow_ipc_batches(iter([df_batch]), buf) - write_int(0, buf) # end of groups - - @staticmethod - def build_grouped_arg_offsets(n_cols, n_keys=0): - """``[len, num_keys, key_col_0, ..., val_col_0, ...]``""" - keys = list(range(n_keys)) - vals = list(range(n_keys, n_cols)) - offsets = [n_keys] + keys + vals - return [len(offsets)] + offsets - - @staticmethod - def make_grouped_arg_offsets(num_key_cols: int, num_value_cols: int) -> list[int]: - """Build arg_offsets for grouped map UDFs. - - Format: [total_len, num_keys, key_idx..., value_idx...] - All columns are in a single struct, so key indexes come first, - then value indexes. - """ - total = num_key_cols + num_value_cols + 1 # +1 for the num_keys prefix - key_idxs = list(range(num_key_cols)) - value_idxs = list(range(num_key_cols, num_key_cols + num_value_cols)) - return [total, num_key_cols] + key_idxs + value_idxs - - -# --------------------------------------------------------------------------- -# Mock data payload helpers -# --------------------------------------------------------------------------- - class MockDataFactory: """Creates mock Arrow batches and group structures for benchmarks.""" - @staticmethod - def make_typed_batch(rows: int, n_cols: int) -> tuple[pa.RecordBatch, IntegerType]: - """Columns cycling through int64, string, binary, boolean.""" - type_cycle = [ - (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), IntegerType()), - (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), - (lambda r: pa.array([f"b{j}".encode() for j in range(r)]), BinaryType()), - (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), - ] - arrays = [type_cycle[i % len(type_cycle)][0](rows) for i in range(n_cols)] + MAX_RECORDS_PER_BATCH = 10_000 + + MIXED_TYPES = [ + (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), IntegerType()), + (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), + (lambda r: pa.array([f"b{j}".encode() for j in range(r)]), BinaryType()), + (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), + ] + + NUMERIC_TYPES = [ + (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), IntegerType()), + (lambda r: pa.array(np.random.rand(r)), DoubleType()), + ] + + @classmethod + def make_struct_type( + cls, + *, + num_fields: int, + base_types: list[tuple[Callable, Any]], + ) -> tuple[Callable, StructType]: + """Compose flat ``base_types`` into a single struct type pool entry. + + Returns ``(factory_fn, StructType)`` suitable for inclusion in a + ``spark_types`` list. The factory produces a ``pa.StructArray`` whose + sub-fields cycle through ``base_types``. + """ fields = [ - StructField(f"col_{i}", type_cycle[i % len(type_cycle)][1]) for i in range(n_cols) - ] - return ( - pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]), - IntegerType(), - ) - - @staticmethod - def make_grouped_batch(rows_per_group, n_cols): - """``group_key (int64)`` + ``(n_cols - 1)`` float32 value columns.""" - arrays = [pa.array(np.zeros(rows_per_group, dtype=np.int64))] + [ - pa.array(np.random.rand(rows_per_group).astype(np.float32)) - for _ in range(n_cols - 1) - ] - fields = [StructField("group_key", IntegerType())] + [ - StructField(f"val_{i}", DoubleType()) for i in range(n_cols - 1) + StructField(f"col_{i}", base_types[i % len(base_types)][1]) for i in range(num_fields) ] - return ( - pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]), - StructType(fields), - ) - @staticmethod - def make_mixed_batch(rows_per_group): - """``id``, ``str_col``, ``float_col``, ``double_col``, ``long_col``.""" - arrays = [ - pa.array(np.zeros(rows_per_group, dtype=np.int64)), - pa.array([f"s{j}" for j in range(rows_per_group)]), - pa.array(np.random.rand(rows_per_group).astype(np.float32)), - pa.array(np.random.rand(rows_per_group)), - pa.array(np.zeros(rows_per_group, dtype=np.int64)), - ] - fields = [ - StructField("id", IntegerType()), - StructField("str_col", StringType()), - StructField("float_col", DoubleType()), - StructField("double_col", DoubleType()), - StructField("long_col", IntegerType()), - ] - return ( - pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]), - StructType(fields), + def factory(r): + arrays = [base_types[i % len(base_types)][0](r) for i in range(num_fields)] + names = [f.name for f in fields] + return pa.StructArray.from_arrays(arrays, names=names) + + return (factory, StructType(fields)) + + @classmethod + def make_batches( + cls, + *, + num_rows: int, + num_cols: int, + spark_types: list[tuple[Callable, Any]], + batch_size: int = MAX_RECORDS_PER_BATCH, + ) -> tuple[list[pa.RecordBatch], StructType]: + """Create RecordBatches with columns cycling through ``spark_types``. + + Splits ``num_rows`` into batches of at most ``batch_size`` rows. + Each ``spark_types`` entry is ``(factory_fn, SparkType)``; if an entry + produces a ``pa.StructArray``, it becomes a struct column naturally. + """ + batches = [] + for offset in range(0, num_rows, batch_size): + rows = min(batch_size, num_rows - offset) + arrays = [spark_types[i % len(spark_types)][0](rows) for i in range(num_cols)] + names = [f"col_{i}" for i in range(num_cols)] + batches.append(pa.RecordBatch.from_arrays(arrays, names=names)) + schema = StructType( + [StructField(f"col_{i}", spark_types[i % len(spark_types)][1]) for i in range(num_cols)] ) + return batches, schema - @staticmethod - def make_pure_batch(rows, n_cols, make_array, spark_type): - """Create a batch where all columns share the same Arrow type.""" - arrays = [make_array(rows) for _ in range(n_cols)] - fields = [StructField(f"col_{i}", spark_type) for i in range(n_cols)] - return pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]) - - @staticmethod - def make_grouped_batches( + @classmethod + def make_batch_groups( + cls, + *, num_groups: int, - rows_per_group: int, - num_key_cols: int, - num_value_cols: int, + num_rows: int, + num_cols: int, + spark_types: list[tuple[Callable, Any]], + batch_size: int = MAX_RECORDS_PER_BATCH, ) -> tuple[list[tuple[pa.RecordBatch, ...]], StructType]: - """Create grouped data: each group is a single batch wrapped in a struct column. + """Create groups of batches. - Returns (groups, return_type) where each group is a 1-tuple of a - struct-wrapped RecordBatch suitable for ArrowStreamGroupUDFSerializer. + Each group has ``num_rows`` total rows, split into batches of ``batch_size``. """ - n_cols = num_key_cols + num_value_cols - type_cycle = [ - (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), LongType()), - (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), - (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), - (lambda r: pa.array(np.random.rand(r)), DoubleType()), - ] - - groups = [] - for g in range(num_groups): - arrays = [ - type_cycle[i % len(type_cycle)][0](rows_per_group) for i in range(n_cols) - ] - names = [f"col_{i}" for i in range(n_cols)] - batch = pa.RecordBatch.from_arrays(arrays, names=names) - # Wrap in struct to match JVM-side encoding (flatten_struct undoes this) - struct_array = pa.StructArray.from_arrays(batch.columns, names=names) - wrapped = pa.RecordBatch.from_arrays([struct_array], names=["_0"]) - groups.append((wrapped,)) - - value_fields = [ - StructField(f"col_{i}", type_cycle[i % len(type_cycle)][1]) - for i in range(num_key_cols, n_cols) - ] - return_type = StructType(value_fields) - - return groups, return_type - - @staticmethod - def make_agg_arrow_groups( - num_groups: int, - rows_per_group: int, - n_cols: int, - ) -> list[tuple[pa.RecordBatch, ...]]: - """Create grouped data for agg Arrow UDFs (plain batches, not struct-wrapped).""" - type_cycle = [ - lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), - lambda r: pa.array(np.random.rand(r)), - lambda r: pa.array(np.random.randint(0, 100, r, dtype=np.int64)), - lambda r: pa.array(np.random.rand(r)), - ] - groups = [] for _ in range(num_groups): - arrays = [type_cycle[i % len(type_cycle)](rows_per_group) for i in range(n_cols)] - names = [f"col_{i}" for i in range(n_cols)] - batch = pa.RecordBatch.from_arrays(arrays, names=names) - groups.append((batch,)) + batches, _ = cls.make_batches( + num_rows=num_rows, + num_cols=num_cols, + spark_types=spark_types, + batch_size=batch_size, + ) + groups.append(tuple(batches)) + + schema = StructType( + [StructField(f"col_{i}", spark_types[i % len(spark_types)][1]) for i in range(num_cols)] + ) + return groups, schema - return groups - @staticmethod - def wrap_batch_in_struct(batch: pa.RecordBatch) -> pa.RecordBatch: - """Wrap all columns into a single struct column, mimicking JVM-side encoding.""" - struct_array = pa.StructArray.from_arrays(batch.columns, names=batch.schema.names) - return pa.RecordBatch.from_arrays([struct_array], names=["_0"]) +class MockUDFFactory: + """Constructs UDF command payloads for the worker protocol.""" + + @classmethod + def make_grouped_arg_offsets(cls, num_key_cols: int, num_value_cols: int) -> list[int]: + """Build arg_offsets: ``[total_len, num_keys, key_idx..., value_idx...]``.""" + key_idxs = list(range(num_key_cols)) + value_idxs = list(range(num_key_cols, num_key_cols + num_value_cols)) + offsets = [num_key_cols] + key_idxs + value_idxs + return [len(offsets)] + offsets # --------------------------------------------------------------------------- @@ -380,146 +305,22 @@ def setup(self, *args): self._args = args def peakmem_worker(self, *args): - MockProtocolWriter.run_worker_from_replayed_file(lambda buf: self._write_scenario(*self._args, buf)) - - -# --------------------------------------------------------------------------- -# Non-grouped Arrow UDF benchmarks -# --------------------------------------------------------------------------- - -# Data-shape scenarios shared by all non-grouped eval types. -# Each entry maps to a ``(batch, num_batches, col0_type)`` tuple; the UDF is -# selected independently via the ``udf`` ASV parameter. -# ``col0_type`` is the Spark type of column 0, used as the default UDF return -# type when the UDF declaration leaves it as ``None``. - - -def _build_non_grouped_scenarios(): - """Build data-shape scenarios for non-grouped Arrow eval types. - - Returns a dict mapping scenario name to ``(batch, num_batches, col0_type)``. - """ - scenarios = {} - - # Varying batch size and column count (mixed types cycling int/str/bin/bool) - for name, (rows, n_cols, num_batches) in { - "sm_batch_few_col": (1_000, 5, 1_500), - "sm_batch_many_col": (1_000, 50, 200), - "lg_batch_few_col": (10_000, 5, 3_500), - "lg_batch_many_col": (10_000, 50, 400), - }.items(): - batch, col0_type = MockDataFactory.make_typed_batch(rows, n_cols) - scenarios[name] = (batch, num_batches, col0_type) - - # Pure-type scenarios (5000 rows, 10 cols, 1000 batches) - _PURE_ROWS, _PURE_COLS, _PURE_BATCHES = 5_000, 10, 1_000 - - scenarios["pure_ints"] = ( - MockDataFactory.make_pure_batch( - _PURE_ROWS, - _PURE_COLS, - lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), - IntegerType(), - ), - _PURE_BATCHES, - IntegerType(), - ) - scenarios["pure_floats"] = ( - MockDataFactory.make_pure_batch( - _PURE_ROWS, - _PURE_COLS, - lambda r: pa.array(np.random.rand(r)), - DoubleType(), - ), - _PURE_BATCHES, - DoubleType(), - ) - scenarios["pure_strings"] = ( - MockDataFactory.make_pure_batch( - _PURE_ROWS, - _PURE_COLS, - lambda r: pa.array([f"s{j}" for j in range(r)]), - StringType(), - ), - _PURE_BATCHES, - StringType(), - ) - scenarios["pure_ts"] = ( - MockDataFactory.make_pure_batch( - _PURE_ROWS, - _PURE_COLS, - lambda r: pa.array( - np.arange(0, r, dtype="datetime64[us]"), type=pa.timestamp("us", tz=None) - ), - TimestampNTZType(), - ), - _PURE_BATCHES, - TimestampNTZType(), - ) - scenarios["mixed_types"] = ( - MockDataFactory.make_typed_batch(_PURE_ROWS, _PURE_COLS)[0], - _PURE_BATCHES, - IntegerType(), - ) - - return scenarios + fd, path = tempfile.mkstemp(prefix="spark-bench-replay-", suffix=".bin") + try: + with os.fdopen(fd, "w+b") as infile: + self._write_scenario(*self._args, infile) + infile.flush() + infile.seek(0) + worker_main(infile, io.BytesIO()) + finally: + try: + os.remove(path) + except FileNotFoundError: + pass # -- SQL_ARROW_BATCHED_UDF -------------------------------------------------- -# Arrow-optimized Python UDF: receives individual Python values per row, -# returns a single Python value. The wire protocol includes an extra -# ``input_type`` (StructType JSON) before the UDF payload. - - -def _build_arrow_batched_scenarios(): - """Build scenarios for SQL_ARROW_BATCHED_UDF. - - Returns a dict mapping scenario name to - ``(batch, num_batches, input_struct_type, col0_type)``. - ``input_struct_type`` is a StructType matching the batch schema, - needed for the wire protocol. - """ - scenarios = {} - - # Row-by-row processing is ~100x slower than columnar Arrow UDFs, - # so batch counts are much smaller to keep benchmarks under 60s. - for name, (rows, n_cols, num_batches) in { - "sm_batch_few_col": (1_000, 5, 20), - "sm_batch_many_col": (1_000, 50, 5), - "lg_batch_few_col": (10_000, 5, 5), - "lg_batch_many_col": (10_000, 50, 2), - }.items(): - batch, col0_type = MockDataFactory.make_typed_batch(rows, n_cols) - type_cycle = [IntegerType(), StringType(), BinaryType(), BooleanType()] - input_struct = StructType( - [StructField(f"col_{i}", type_cycle[i % len(type_cycle)]) for i in range(n_cols)] - ) - scenarios[name] = (batch, num_batches, input_struct, col0_type) - - _PURE_ROWS, _PURE_COLS, _PURE_BATCHES = 5_000, 10, 10 - - for scenario_name, make_array, spark_type in [ - ( - "pure_ints", - lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), - IntegerType(), - ), - ("pure_floats", lambda r: pa.array(np.random.rand(r)), DoubleType()), - ("pure_strings", lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), - ]: - batch = MockDataFactory.make_pure_batch(_PURE_ROWS, _PURE_COLS, make_array, spark_type) - input_struct = StructType([StructField(f"col_{i}", spark_type) for i in range(_PURE_COLS)]) - scenarios[scenario_name] = (batch, _PURE_BATCHES, input_struct, spark_type) - - # mixed types - batch, col0_type = MockDataFactory.make_typed_batch(_PURE_ROWS, _PURE_COLS) - type_cycle = [IntegerType(), StringType(), BinaryType(), BooleanType()] - input_struct = StructType( - [StructField(f"col_{i}", type_cycle[i % len(type_cycle)]) for i in range(_PURE_COLS)] - ) - scenarios["mixed_types"] = (batch, _PURE_BATCHES, input_struct, col0_type) - - return scenarios +# UDF receives individual Python values per row, returns a single Python value. class _ArrowBatchedBenchMixin: @@ -529,7 +330,67 @@ class _ArrowBatchedBenchMixin: requires before the UDF payload. """ - _scenarios = _build_arrow_batched_scenarios() + def _build_scenarios(): + """Build scenarios for SQL_ARROW_BATCHED_UDF. + + Returns a dict mapping scenario name to + ``(batches, input_struct_type, col0_type)``. + ``input_struct_type`` is a StructType matching the batch schema, + needed for the wire protocol. + """ + scenarios = {} + + # Row-by-row processing is ~100x slower than columnar Arrow UDFs, + # so batch counts are much smaller to keep benchmarks under 60s. + for name, (num_rows, num_cols, batch_size) in { + "sm_batch_few_col": (20_000, 5, 1_000), + "sm_batch_many_col": (5_000, 50, 1_000), + "lg_batch_few_col": (50_000, 5, 10_000), + "lg_batch_many_col": (20_000, 50, 10_000), + }.items(): + batches, schema = MockDataFactory.make_batches( + num_rows=num_rows, + num_cols=num_cols, + spark_types=MockDataFactory.MIXED_TYPES, + batch_size=batch_size, + ) + scenarios[name] = (batches, schema, schema.fields[0].dataType) + + _NUM_ROWS, _NUM_COLS, _BATCH_SIZE = 50_000, 10, 5_000 + + for scenario_name, spark_types in [ + ( + "pure_ints", + [ + ( + lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), + IntegerType(), + ) + ], + ), + ("pure_floats", [(lambda r: pa.array(np.random.rand(r)), DoubleType())]), + ("pure_strings", [(lambda r: pa.array([f"s{j}" for j in range(r)]), StringType())]), + ]: + batches, schema = MockDataFactory.make_batches( + num_rows=_NUM_ROWS, + num_cols=_NUM_COLS, + spark_types=spark_types, + batch_size=_BATCH_SIZE, + ) + scenarios[scenario_name] = (batches, schema, schema.fields[0].dataType) + + # mixed types + batches, schema = MockDataFactory.make_batches( + num_rows=_NUM_ROWS, + num_cols=_NUM_COLS, + spark_types=MockDataFactory.MIXED_TYPES, + batch_size=_BATCH_SIZE, + ) + scenarios["mixed_types"] = (batches, schema, schema.fields[0].dataType) + + return scenarios + + _scenarios = _build_scenarios() # UDFs for SQL_ARROW_BATCHED_UDF operate on individual Python values. # arg_offsets=[0] means the UDF receives column 0 value per row. _udfs = { @@ -541,20 +402,20 @@ class _ArrowBatchedBenchMixin: param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - batch, num_batches, input_struct, col0_type = self._scenarios[scenario] + batches, input_struct, col0_type = self._scenarios[scenario] udf_func, ret_type, arg_offsets = self._udfs[udf_name] if ret_type is None: ret_type = col0_type - def write_command(b): + def write_udf(b): # input_type is read before UDF payloads for ARROW_BATCHED_UDF MockProtocolWriter.write_utf8(input_struct.json(), b) - MockProtocolWriter.build_udf_payload(udf_func, ret_type, arg_offsets, b) + MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b) MockProtocolWriter.write_worker_input( PythonEvalType.SQL_ARROW_BATCHED_UDF, - write_command, - lambda b: MockProtocolWriter.write_arrow_ipc_batches((batch for _ in range(num_batches)), b), + write_udf, + lambda b: MockProtocolWriter.write_data_payload(iter(batches), b), buf, ) @@ -568,73 +429,51 @@ class ArrowBatchedUDFPeakmemBench(_ArrowBatchedBenchMixin, _PeakmemBenchBase): # -- SQL_GROUPED_AGG_ARROW_UDF ------------------------------------------------ -# UDF receives pa.Array columns per group, returns scalar. - - -def _grouped_agg_arrow_sum(col): - """Sum a single Arrow column.""" - import pyarrow.compute as pc - - return pc.sum(col).as_py() - - -def _grouped_agg_arrow_mean_multi(col0, col1): - """Mean of two Arrow columns combined.""" - import pyarrow.compute as pc - - return (pc.mean(col0).as_py() or 0) + (pc.mean(col1).as_py() or 0) - +# UDF receives ``pa.Array`` columns per group, returns scalar. -# -- SQL_GROUPED_AGG_ARROW_ITER_UDF ------------------------------------------ -# UDF receives Iterator[pa.Array] (or Iterator[Tuple[pa.Array, ...]]) per group, -# returns scalar. +class _GroupedAggArrowBenchMixin: + """Provides _write_scenario for SQL_GROUPED_AGG_ARROW_UDF.""" -def _grouped_agg_arrow_iter_sum(batch_iter): - """Sum across batches via iterator.""" - import pyarrow.compute as pc - - total = 0 - for col in batch_iter: - total += pc.sum(col).as_py() or 0 - return total - - -def _grouped_agg_arrow_iter_mean_multi(batch_iter): - """Mean across batches of tuples via iterator.""" - import pyarrow.compute as pc - - total = 0.0 - for col0, col1 in batch_iter: - total += (pc.mean(col0).as_py() or 0) + (pc.mean(col1).as_py() or 0) - return total - - -def _build_grouped_agg_arrow_scenarios(): - """Build scenarios for SQL_GROUPED_AGG_ARROW_UDF / AGG_ARROW_ITER_UDF. - - Returns dict mapping name to (groups, n_cols, arg_offsets_map) where - arg_offsets_map provides offsets per UDF name. - """ - scenarios = {} + def _grouped_agg_arrow_sum(col): + """Sum a single Arrow column.""" + import pyarrow.compute as pc - for name, (num_groups, rows_per_group, n_cols) in { - "few_groups_sm": (50, 5_000, 5), - "few_groups_lg": (50, 50_000, 5), - "many_groups_sm": (2_000, 500, 5), - "many_groups_lg": (500, 10_000, 5), - "wide_cols": (200, 5_000, 20), - }.items(): - groups = MockDataFactory.make_agg_arrow_groups(num_groups, rows_per_group, n_cols) - scenarios[name] = (groups, n_cols) + return pc.sum(col).as_py() - return scenarios + def _grouped_agg_arrow_mean_multi(col0, col1): + """Mean of two Arrow columns combined.""" + import pyarrow.compute as pc + return (pc.mean(col0).as_py() or 0) + (pc.mean(col1).as_py() or 0) -class _GroupedAggArrowBenchMixin: - """Provides _write_scenario for SQL_GROUPED_AGG_ARROW_UDF.""" + def _build_scenarios(): + """Build scenarios for SQL_GROUPED_AGG_ARROW_UDF / AGG_ARROW_ITER_UDF. - _scenarios = _build_grouped_agg_arrow_scenarios() + Returns dict mapping name to (groups, n_cols, arg_offsets_map) where + arg_offsets_map provides offsets per UDF name. + """ + scenarios = {} + + for name, (num_groups, rows_per_group, n_cols) in { + "few_groups_sm": (50, 5_000, 5), + "few_groups_lg": (50, 50_000, 5), + "many_groups_sm": (2_000, 500, 5), + "many_groups_lg": (500, 10_000, 5), + "wide_cols": (200, 5_000, 20), + }.items(): + groups, _ = MockDataFactory.make_batch_groups( + num_groups=num_groups, + num_rows=rows_per_group, + num_cols=n_cols, + spark_types=MockDataFactory.NUMERIC_TYPES, + batch_size=rows_per_group, + ) + scenarios[name] = (groups, n_cols) + + return scenarios + + _scenarios = _build_scenarios() _udfs = { "sum_udf": _grouped_agg_arrow_sum, "mean_multi_udf": _grouped_agg_arrow_mean_multi, @@ -654,13 +493,13 @@ def _write_scenario(self, scenario, udf_name, buf): return_type = DoubleType() - def write_command(b): - MockProtocolWriter.build_udf_payload(udf_func, return_type, arg_offsets, b) + def write_udf(b): + MockProtocolWriter.write_udf_payload(udf_func, return_type, arg_offsets, b) MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF, - write_command, - lambda b: MockProtocolWriter.write_grouped_arrow_data(groups, num_dfs=1, buf=b), + write_udf, + lambda b: MockProtocolWriter.write_grouped_data_payload(groups, num_dfs=1, buf=b), buf, ) @@ -676,6 +515,24 @@ class GroupedAggArrowUDFPeakmemBench(_GroupedAggArrowBenchMixin, _PeakmemBenchBa class _GroupedAggArrowIterBenchMixin(_GroupedAggArrowBenchMixin): """Provides _write_scenario for SQL_GROUPED_AGG_ARROW_ITER_UDF.""" + def _grouped_agg_arrow_iter_sum(batch_iter): + """Sum across batches via iterator.""" + import pyarrow.compute as pc + + total = 0 + for col in batch_iter: + total += pc.sum(col).as_py() or 0 + return total + + def _grouped_agg_arrow_iter_mean_multi(batch_iter): + """Mean across batches of tuples via iterator.""" + import pyarrow.compute as pc + + total = 0.0 + for col0, col1 in batch_iter: + total += (pc.mean(col0).as_py() or 0) + (pc.mean(col1).as_py() or 0) + return total + _udfs = { "sum_udf": _grouped_agg_arrow_iter_sum, "mean_multi_udf": _grouped_agg_arrow_iter_mean_multi, @@ -695,13 +552,13 @@ def _write_scenario(self, scenario, udf_name, buf): return_type = DoubleType() - def write_command(b): - MockProtocolWriter.build_udf_payload(udf_func, return_type, arg_offsets, b) + def write_udf(b): + MockProtocolWriter.write_udf_payload(udf_func, return_type, arg_offsets, b) MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF, - write_command, - lambda b: MockProtocolWriter.write_grouped_arrow_data(groups, num_dfs=1, buf=b), + write_udf, + lambda b: MockProtocolWriter.write_grouped_data_payload(groups, num_dfs=1, buf=b), buf, ) @@ -715,55 +572,61 @@ class GroupedAggArrowIterUDFPeakmemBench(_GroupedAggArrowIterBenchMixin, _Peakme # -- SQL_GROUPED_MAP_ARROW_UDF ------------------------------------------------ -# UDF receives (key: pa.RecordBatch, values: Iterator[pa.RecordBatch]), -# returns Iterator[pa.RecordBatch]. - - -def _grouped_map_arrow_identity(table): - """Identity grouped map UDF: takes a pa.Table, returns it as-is.""" - return table - - -def _grouped_map_arrow_sort(table): - """Sort by first column.""" - return table.sort_by([(table.column_names[0], "ascending")]) +# UDF receives ``pa.Table``, returns ``pa.Table``. -def _grouped_map_arrow_filter(table): - """Filter rows where first column is valid.""" - import pyarrow.compute as pc - - return table.filter(pc.is_valid(table.column(0))) - +class _GroupedMapArrowBenchMixin: + """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_UDF.""" -def _build_grouped_map_arrow_scenarios(): - """Build scenarios for SQL_GROUPED_MAP_ARROW_UDF. + def _grouped_map_arrow_identity(table): + """Identity grouped map UDF: takes a pa.Table, returns it as-is.""" + return table - Returns dict mapping name to (groups, return_type, arg_offsets). - """ - scenarios = {} - - for name, (num_groups, rows_per_group, num_key_cols, num_value_cols) in { - "few_groups_sm": (50, 5_000, 1, 4), - "few_groups_lg": (50, 50_000, 1, 4), - "many_groups_sm": (2_000, 500, 1, 4), - "many_groups_lg": (500, 10_000, 1, 4), - "wide_values": (200, 5_000, 1, 20), - "multi_key": (200, 5_000, 3, 5), - }.items(): - groups, return_type = MockDataFactory.make_grouped_batches( - num_groups, rows_per_group, num_key_cols, num_value_cols - ) - arg_offsets = MockProtocolWriter.make_grouped_arg_offsets(num_key_cols, num_value_cols) - scenarios[name] = (groups, return_type, arg_offsets) + def _grouped_map_arrow_sort(table): + """Sort by first column.""" + return table.sort_by([(table.column_names[0], "ascending")]) - return scenarios + def _grouped_map_arrow_filter(table): + """Filter rows where first column is valid.""" + import pyarrow.compute as pc + return table.filter(pc.is_valid(table.column(0))) -class _GroupedMapArrowBenchMixin: - """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_UDF.""" + def _build_scenarios(): + """Build scenarios for SQL_GROUPED_MAP_ARROW_UDF. - _scenarios = _build_grouped_map_arrow_scenarios() + Returns dict mapping name to (groups, return_type, arg_offsets). + """ + scenarios = {} + + for name, (num_groups, rows_per_group, num_key_cols, num_value_cols) in { + "few_groups_sm": (50, 5_000, 1, 4), + "few_groups_lg": (50, 50_000, 1, 4), + "many_groups_sm": (2_000, 500, 1, 4), + "many_groups_lg": (500, 10_000, 1, 4), + "wide_values": (200, 5_000, 1, 20), + "multi_key": (200, 5_000, 3, 5), + }.items(): + n_fields = num_key_cols + num_value_cols + struct_type = MockDataFactory.make_struct_type( + num_fields=n_fields, + base_types=MockDataFactory.MIXED_TYPES, + ) + groups, schema = MockDataFactory.make_batch_groups( + num_groups=num_groups, + num_rows=rows_per_group, + num_cols=1, + spark_types=[struct_type], + batch_size=rows_per_group, + ) + inner_fields = schema.fields[0].dataType.fields + return_type = StructType(inner_fields[num_key_cols:]) + arg_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, num_value_cols) + scenarios[name] = (groups, return_type, arg_offsets) + + return scenarios + + _scenarios = _build_scenarios() _udfs = { "identity_udf": _grouped_map_arrow_identity, "sort_udf": _grouped_map_arrow_sort, @@ -776,7 +639,7 @@ def _write_scenario(self, scenario, udf_name, buf): groups, return_type, arg_offsets = self._scenarios[scenario] udf_func = self._udfs[udf_name] - def write_command(b): + def write_udf(b): # Grouped map uses a single UDF with grouped arg_offsets write_int(1, b) # num_udfs write_int(len(arg_offsets), b) # num_arg @@ -791,8 +654,8 @@ def write_command(b): MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF, - write_command, - lambda b: MockProtocolWriter.write_grouped_arrow_data(groups, num_dfs=1, buf=b), + write_udf, + lambda b: MockProtocolWriter.write_grouped_data_payload(groups, num_dfs=1, buf=b), buf, ) @@ -806,32 +669,27 @@ class GroupedMapArrowUDFPeakmemBench(_GroupedMapArrowBenchMixin, _PeakmemBenchBa # -- SQL_GROUPED_MAP_ARROW_ITER_UDF ------------------------------------------ -# UDF receives Iterator[pa.RecordBatch] per group, -# returns Iterator[pa.RecordBatch]. -# Uses the same wire format and serializer as SQL_GROUPED_MAP_ARROW_UDF. - +# UDF receives ``Iterator[pa.RecordBatch]`` per group, returns ``Iterator[pa.RecordBatch]``. -def _grouped_map_arrow_iter_identity(batches): - """Identity grouped map iter UDF: yields each batch as-is.""" - yield from batches - - -def _grouped_map_arrow_iter_sort(batches): - """Sort each batch by first column.""" - for batch in batches: - yield batch.sort_by([(batch.column_names[0], "ascending")]) +class _GroupedMapArrowIterBenchMixin(_GroupedMapArrowBenchMixin): + """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_ITER_UDF.""" -def _grouped_map_arrow_iter_filter(batches): - """Filter rows where first column is valid.""" - import pyarrow.compute as pc + def _grouped_map_arrow_iter_identity(batches): + """Identity grouped map iter UDF: yields each batch as-is.""" + yield from batches - for batch in batches: - yield batch.filter(pc.is_valid(batch.column(0))) + def _grouped_map_arrow_iter_sort(batches): + """Sort each batch by first column.""" + for batch in batches: + yield batch.sort_by([(batch.column_names[0], "ascending")]) + def _grouped_map_arrow_iter_filter(batches): + """Filter rows where first column is valid.""" + import pyarrow.compute as pc -class _GroupedMapArrowIterBenchMixin(_GroupedMapArrowBenchMixin): - """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_ITER_UDF.""" + for batch in batches: + yield batch.filter(pc.is_valid(batch.column(0))) _udfs = { "identity_udf": _grouped_map_arrow_iter_identity, @@ -845,7 +703,7 @@ def _write_scenario(self, scenario, udf_name, buf): groups, return_type, arg_offsets = self._scenarios[scenario] udf_func = self._udfs[udf_name] - def write_command(b): + def write_udf(b): write_int(1, b) # num_udfs write_int(len(arg_offsets), b) # num_arg for offset in arg_offsets: @@ -859,8 +717,8 @@ def write_command(b): MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF, - write_command, - lambda b: MockProtocolWriter.write_grouped_arrow_data(groups, num_dfs=1, buf=b), + write_udf, + lambda b: MockProtocolWriter.write_grouped_data_payload(groups, num_dfs=1, buf=b), buf, ) @@ -874,40 +732,45 @@ class GroupedMapArrowIterUDFPeakmemBench(_GroupedMapArrowIterBenchMixin, _Peakme # -- SQL_GROUPED_MAP_PANDAS_UDF ------------------------------------------------ -# UDF receives a ``pandas.DataFrame`` per group, returns a ``pandas.DataFrame``. -# Groups are sent as separate Arrow IPC streams with optional sub-batching -# (``spark.sql.execution.arrow.maxRecordsPerBatch``). - -_MAX_RECORDS_PER_BATCH = 10_000 - - -def _build_grouped_map_pandas_scenarios(): - scenarios = {} - - for name, (rows, n_cols, num_groups, max_rpb) in { - "sm_grp_few_col": (1_000, 5, 200, None), - "sm_grp_many_col": (1_000, 50, 30, None), - "lg_grp_few_col": (100_000, 5, 30, _MAX_RECORDS_PER_BATCH), - "lg_grp_many_col": (100_000, 50, 5, _MAX_RECORDS_PER_BATCH), - }.items(): - batch, schema = MockDataFactory.make_grouped_batch(rows, n_cols) - scenarios[name] = (batch, num_groups, schema, max_rpb) - - # mixed column types, small groups - batch, schema = MockDataFactory.make_mixed_batch(3) - scenarios["mixed_types"] = (batch, 200, schema, None) - - return scenarios +# UDF receives ``pandas.DataFrame`` per group, returns ``pandas.DataFrame``. class _GroupedMapPandasBenchMixin: """Provides ``_write_scenario`` for SQL_GROUPED_MAP_PANDAS_UDF. - Each scenario stores ``(batch, num_groups, schema, max_rpb)``. + Each scenario stores ``(groups, schema)``. Groups are written as separate Arrow IPC streams. """ - _scenarios = _build_grouped_map_pandas_scenarios() + def _build_scenarios(): + scenarios = {} + + for name, (rows, n_cols, num_groups) in { + "sm_grp_few_col": (1_000, 5, 200), + "sm_grp_many_col": (1_000, 50, 30), + "lg_grp_few_col": (100_000, 5, 30), + "lg_grp_many_col": (100_000, 50, 5), + }.items(): + groups, schema = MockDataFactory.make_batch_groups( + num_groups=num_groups, + num_rows=rows, + num_cols=n_cols, + spark_types=MockDataFactory.NUMERIC_TYPES, + ) + scenarios[name] = (groups, schema) + + # mixed column types, small groups + batches, schema = MockDataFactory.make_batches( + num_rows=3, + num_cols=5, + spark_types=MockDataFactory.MIXED_TYPES, + batch_size=3, + ) + scenarios["mixed_types"] = ([(b,) for b in batches] * 200, schema) + + return scenarios + + _scenarios = _build_scenarios() # Each UDF entry: (func, ret_type, n_args). # ret_type=None means "use the input schema" (excluding key columns for n_args=2). # n_args=1 -> func(pdf), n_args=2 -> func(key, pdf). @@ -920,22 +783,21 @@ class _GroupedMapPandasBenchMixin: param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - batch, num_groups, schema, max_rpb = self._scenarios[scenario] + groups, schema = self._scenarios[scenario] udf_func, ret_type, n_args = self._udfs[udf_name] if ret_type is None: # 2-arg UDFs receive (key, pdf) where pdf excludes key columns, # so the return schema must also exclude them. ret_type = StructType(schema.fields[n_args - 1 :]) if n_args > 1 else schema n_cols = len(schema.fields) - arg_offsets = MockProtocolWriter.build_grouped_arg_offsets(n_cols, n_keys=n_args - 1) + arg_offsets = MockUDFFactory.make_grouped_arg_offsets(n_args - 1, n_cols - (n_args - 1)) MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, - lambda b: MockProtocolWriter.build_udf_payload(udf_func, ret_type, arg_offsets, b), - lambda b: MockProtocolWriter.write_grouped_arrow_data( - [(batch,)] * num_groups, + lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b), + lambda b: MockProtocolWriter.write_grouped_data_payload( + groups, num_dfs=1, buf=b, - max_records_per_batch=max_rpb, ), buf, ) @@ -951,82 +813,113 @@ class GroupedMapPandasUDFPeakmemBench(_GroupedMapPandasBenchMixin, _PeakmemBench # -- SQL_MAP_ARROW_ITER_UDF ------------------------------------------------ # UDF receives ``Iterator[pa.RecordBatch]``, returns ``Iterator[pa.RecordBatch]``. -# Used by ``mapInArrow``. - -def _identity_batch_iter(it): - yield from it +class _MapArrowIterBenchMixin: + """Provides ``_write_scenario`` for SQL_MAP_ARROW_ITER_UDF. -def _sort_batch_iter(it): - import pyarrow.compute as pc + Wraps input batches in a struct column to match the JVM-side wire format + (``flatten_struct`` undoes this). + """ - for batch in it: - indices = pc.sort_indices(batch.column(0)) - yield batch.take(indices) + def _identity_batch_iter(it): + yield from it + def _sort_batch_iter(it): + import pyarrow.compute as pc -def _filter_batch_iter(it): - import pyarrow.compute as pc + for batch in it: + indices = pc.sort_indices(batch.column(0)) + yield batch.take(indices) - for batch in it: - mask = pc.is_valid(batch.column(0)) - yield batch.filter(mask) + def _filter_batch_iter(it): + import pyarrow.compute as pc + for batch in it: + mask = pc.is_valid(batch.column(0)) + yield batch.filter(mask) -def _build_map_arrow_iter_scenarios(): - """Build scenarios for SQL_MAP_ARROW_ITER_UDF. + def _build_map_arrow_iter_scenarios(): + """Build scenarios for SQL_MAP_ARROW_ITER_UDF. - Same data shapes as non-grouped scenarios but with reduced batch counts - to account for the struct wrap/unwrap overhead per batch. - """ - scenarios = {} - - for name, (rows, n_cols, num_batches) in { - "sm_batch_few_col": (1_000, 5, 500), - "sm_batch_many_col": (1_000, 50, 50), - "lg_batch_few_col": (10_000, 5, 500), - "lg_batch_many_col": (10_000, 50, 50), - }.items(): - batch, col0_type = MockDataFactory.make_typed_batch(rows, n_cols) - scenarios[name] = (batch, num_batches, col0_type) - - _PURE_ROWS, _PURE_COLS, _PURE_BATCHES = 5_000, 10, 200 - - for scenario_name, make_array, spark_type in [ - ( - "pure_ints", - lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), - IntegerType(), - ), - ("pure_floats", lambda r: pa.array(np.random.rand(r)), DoubleType()), - ("pure_strings", lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), - ( - "pure_ts", - lambda r: pa.array( - np.arange(0, r, dtype="datetime64[us]"), type=pa.timestamp("us", tz=None) + Same data shapes as non-grouped scenarios but with reduced batch counts + to account for the struct wrap/unwrap overhead per batch. + """ + scenarios = {} + + for name, (num_rows, num_cols, batch_size) in { + "sm_batch_few_col": (500_000, 5, 1_000), + "sm_batch_many_col": (50_000, 50, 1_000), + "lg_batch_few_col": (5_000_000, 5, 10_000), + "lg_batch_many_col": (500_000, 50, 10_000), + }.items(): + struct_type = MockDataFactory.make_struct_type( + num_fields=num_cols, + base_types=MockDataFactory.MIXED_TYPES, + ) + batches, schema = MockDataFactory.make_batches( + num_rows=num_rows, + num_cols=1, + spark_types=[struct_type], + batch_size=batch_size, + ) + col0_type = schema.fields[0].dataType.fields[0].dataType + scenarios[name] = (batches, col0_type) + + _NUM_ROWS, _NUM_COLS, _BATCH_SIZE = 1_000_000, 10, 5_000 + + for scenario_name, spark_types in [ + ( + "pure_ints", + [ + ( + lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), + IntegerType(), + ) + ], ), - TimestampNTZType(), - ), - ]: - batch = MockDataFactory.make_pure_batch(_PURE_ROWS, _PURE_COLS, make_array, spark_type) - scenarios[scenario_name] = (batch, _PURE_BATCHES, spark_type) - - scenarios["mixed_types"] = ( - MockDataFactory.make_typed_batch(_PURE_ROWS, _PURE_COLS)[0], - _PURE_BATCHES, - IntegerType(), - ) - - return scenarios - - -class _MapArrowIterBenchMixin: - """Provides ``_write_scenario`` for SQL_MAP_ARROW_ITER_UDF. + ("pure_floats", [(lambda r: pa.array(np.random.rand(r)), DoubleType())]), + ("pure_strings", [(lambda r: pa.array([f"s{j}" for j in range(r)]), StringType())]), + ( + "pure_ts", + [ + ( + lambda r: pa.array( + np.arange(0, r, dtype="datetime64[us]"), + type=pa.timestamp("us", tz=None), + ), + TimestampNTZType(), + ) + ], + ), + ]: + struct_type = MockDataFactory.make_struct_type( + num_fields=_NUM_COLS, + base_types=spark_types, + ) + batches, schema = MockDataFactory.make_batches( + num_rows=_NUM_ROWS, + num_cols=1, + spark_types=[struct_type], + batch_size=_BATCH_SIZE, + ) + col0_type = schema.fields[0].dataType.fields[0].dataType + scenarios[scenario_name] = (batches, col0_type) + + struct_type = MockDataFactory.make_struct_type( + num_fields=_NUM_COLS, + base_types=MockDataFactory.MIXED_TYPES, + ) + batches, schema = MockDataFactory.make_batches( + num_rows=_NUM_ROWS, + num_cols=1, + spark_types=[struct_type], + batch_size=_BATCH_SIZE, + ) + col0_type = schema.fields[0].dataType.fields[0].dataType + scenarios["mixed_types"] = (batches, col0_type) - Wraps input batches in a struct column to match the JVM-side wire format - (``flatten_struct`` undoes this). - """ + return scenarios _scenarios = _build_map_arrow_iter_scenarios() _udfs = { @@ -1038,15 +931,14 @@ class _MapArrowIterBenchMixin: param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - batch, num_batches, col0_type = self._scenarios[scenario] + batches, col0_type = self._scenarios[scenario] udf_func, ret_type, arg_offsets = self._udfs[udf_name] if ret_type is None: ret_type = col0_type - wrapped = MockDataFactory.wrap_batch_in_struct(batch) MockProtocolWriter.write_worker_input( PythonEvalType.SQL_MAP_ARROW_ITER_UDF, - lambda b: MockProtocolWriter.build_udf_payload(udf_func, ret_type, arg_offsets, b), - lambda b: MockProtocolWriter.write_arrow_ipc_batches((wrapped for _ in range(num_batches)), b), + lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b), + lambda b: MockProtocolWriter.write_data_payload(iter(batches), b), buf, ) @@ -1060,27 +952,90 @@ class MapArrowIterUDFPeakmemBench(_MapArrowIterBenchMixin, _PeakmemBenchBase): # -- SQL_SCALAR_ARROW_UDF --------------------------------------------------- -# UDF receives individual ``pa.Array`` columns, returns a ``pa.Array``. -# All UDFs operate on arg_offsets=[0] so they work with any column type. +# UDF receives ``pa.Array`` columns, returns ``pa.Array``. -def _sort_arrow(c): - import pyarrow.compute as pc +class _ScalarArrowBenchMixin: + """Mixin for SQL_SCALAR_ARROW_UDF benchmarks.""" - return pc.take(c, pc.sort_indices(c)) + def _sort_arrow(c): + import pyarrow.compute as pc + return pc.take(c, pc.sort_indices(c)) -def _nullcheck_arrow(c): - import pyarrow.compute as pc + def _nullcheck_arrow(c): + import pyarrow.compute as pc - return pc.is_valid(c) + return pc.is_valid(c) + def _build_scenarios(): + """Build data-shape scenarios for non-grouped Arrow eval types. -class _ScalarArrowBenchMixin: - """Mixin for SQL_SCALAR_ARROW_UDF benchmarks.""" + Returns a dict mapping scenario name to ``(batches, col0_type)``. + """ + scenarios = {} + + for name, (num_rows, num_cols, batch_size) in { + "sm_batch_few_col": (1_500_000, 5, 1_000), + "sm_batch_many_col": (200_000, 50, 1_000), + "lg_batch_few_col": (35_000_000, 5, 10_000), + "lg_batch_many_col": (4_000_000, 50, 10_000), + }.items(): + batches, schema = MockDataFactory.make_batches( + num_rows=num_rows, + num_cols=num_cols, + spark_types=MockDataFactory.MIXED_TYPES, + batch_size=batch_size, + ) + scenarios[name] = (batches, schema.fields[0].dataType) + + _NUM_ROWS, _NUM_COLS, _BATCH_SIZE = 5_000_000, 10, 5_000 + + for scenario_name, spark_types in [ + ( + "pure_ints", + [ + ( + lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), + IntegerType(), + ) + ], + ), + ("pure_floats", [(lambda r: pa.array(np.random.rand(r)), DoubleType())]), + ("pure_strings", [(lambda r: pa.array([f"s{j}" for j in range(r)]), StringType())]), + ( + "pure_ts", + [ + ( + lambda r: pa.array( + np.arange(0, r, dtype="datetime64[us]"), + type=pa.timestamp("us", tz=None), + ), + TimestampNTZType(), + ) + ], + ), + ]: + batches, schema = MockDataFactory.make_batches( + num_rows=_NUM_ROWS, + num_cols=_NUM_COLS, + spark_types=spark_types, + batch_size=_BATCH_SIZE, + ) + scenarios[scenario_name] = (batches, schema.fields[0].dataType) + + batches, schema = MockDataFactory.make_batches( + num_rows=_NUM_ROWS, + num_cols=_NUM_COLS, + spark_types=MockDataFactory.MIXED_TYPES, + batch_size=_BATCH_SIZE, + ) + scenarios["mixed_types"] = (batches, schema.fields[0].dataType) + + return scenarios _eval_type = PythonEvalType.SQL_SCALAR_ARROW_UDF - _scenarios = _build_non_grouped_scenarios() + _scenarios = _build_scenarios() # ret_type=None means "use col0_type from the scenario" _udfs = { "identity_udf": (lambda c: c, None, [0]), @@ -1091,14 +1046,14 @@ class _ScalarArrowBenchMixin: param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - batch, num_batches, col0_type = self._scenarios[scenario] + batches, col0_type = self._scenarios[scenario] udf_func, ret_type, arg_offsets = self._udfs[udf_name] if ret_type is None: ret_type = col0_type MockProtocolWriter.write_worker_input( self._eval_type, - lambda b: MockProtocolWriter.build_udf_payload(udf_func, ret_type, arg_offsets, b), - lambda b: MockProtocolWriter.write_arrow_ipc_batches((batch for _ in range(num_batches)), b), + lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b), + lambda b: MockProtocolWriter.write_data_payload(iter(batches), b), buf, ) @@ -1115,26 +1070,23 @@ class ScalarArrowUDFPeakmemBench(_ScalarArrowBenchMixin, _PeakmemBenchBase): # UDF receives ``Iterator[pa.Array]``, returns ``Iterator[pa.Array]``. -def _identity_iter(it): - return (c for c in it) - - -def _sort_iter(it): - import pyarrow.compute as pc - - for c in it: - yield pc.take(c, pc.sort_indices(c)) +class _ScalarArrowIterBenchMixin(_ScalarArrowBenchMixin): + """Mixin for SQL_SCALAR_ARROW_ITER_UDF benchmarks.""" + def _identity_iter(it): + return (c for c in it) -def _nullcheck_iter(it): - import pyarrow.compute as pc + def _sort_iter(it): + import pyarrow.compute as pc - for c in it: - yield pc.is_valid(c) + for c in it: + yield pc.take(c, pc.sort_indices(c)) + def _nullcheck_iter(it): + import pyarrow.compute as pc -class _ScalarArrowIterBenchMixin(_ScalarArrowBenchMixin): - """Mixin for SQL_SCALAR_ARROW_ITER_UDF benchmarks.""" + for c in it: + yield pc.is_valid(c) _eval_type = PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF _udfs = { From 81a1a67ce0cd31fa569fbaad5883fe7c3446165f Mon Sep 17 00:00:00 2001 From: Yicong-Huang <17627829+Yicong-Huang@users.noreply.github.com> Date: Thu, 26 Mar 2026 17:42:17 -0700 Subject: [PATCH 4/4] refactor: unify scenario tuples, write_udf_payload, and TYPE_REGISTRY --- python/benchmarks/bench_eval_type.py | 181 ++++++++++++--------------- 1 file changed, 83 insertions(+), 98 deletions(-) diff --git a/python/benchmarks/bench_eval_type.py b/python/benchmarks/bench_eval_type.py index 091fd7868374..74449db51d41 100644 --- a/python/benchmarks/bench_eval_type.py +++ b/python/benchmarks/bench_eval_type.py @@ -165,17 +165,21 @@ class MockDataFactory: MAX_RECORDS_PER_BATCH = 10_000 - MIXED_TYPES = [ - (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), IntegerType()), - (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), - (lambda r: pa.array([f"b{j}".encode() for j in range(r)]), BinaryType()), - (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), - ] + TYPE_REGISTRY: dict[str, tuple[Callable, Any]] = { + "int": (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int32)), IntegerType()), + "double": (lambda r: pa.array(np.random.rand(r)), DoubleType()), + "string": (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), + "binary": (lambda r: pa.array([f"b{j}".encode() for j in range(r)]), BinaryType()), + "boolean": (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), + } - NUMERIC_TYPES = [ - (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), IntegerType()), - (lambda r: pa.array(np.random.rand(r)), DoubleType()), + MIXED_TYPES = [ + TYPE_REGISTRY["int"], + TYPE_REGISTRY["string"], + TYPE_REGISTRY["binary"], + TYPE_REGISTRY["boolean"], ] + NUMERIC_TYPES = [TYPE_REGISTRY["int"], TYPE_REGISTRY["double"]] @classmethod def make_struct_type( @@ -187,7 +191,7 @@ def make_struct_type( """Compose flat ``base_types`` into a single struct type pool entry. Returns ``(factory_fn, StructType)`` suitable for inclusion in a - ``spark_types`` list. The factory produces a ``pa.StructArray`` whose + ``spark_type_pool`` list. The factory produces a ``pa.StructArray`` whose sub-fields cycle through ``base_types``. """ fields = [ @@ -207,23 +211,26 @@ def make_batches( *, num_rows: int, num_cols: int, - spark_types: list[tuple[Callable, Any]], batch_size: int = MAX_RECORDS_PER_BATCH, + spark_type_pool: list[tuple[Callable, Any]], ) -> tuple[list[pa.RecordBatch], StructType]: - """Create RecordBatches with columns cycling through ``spark_types``. + """Create RecordBatches with columns cycling through ``spark_type_pool``. Splits ``num_rows`` into batches of at most ``batch_size`` rows. - Each ``spark_types`` entry is ``(factory_fn, SparkType)``; if an entry - produces a ``pa.StructArray``, it becomes a struct column naturally. + Each pool entry is ``(factory_fn, SparkType)``; if an entry produces + a ``pa.StructArray``, it becomes a struct column naturally. """ batches = [] for offset in range(0, num_rows, batch_size): rows = min(batch_size, num_rows - offset) - arrays = [spark_types[i % len(spark_types)][0](rows) for i in range(num_cols)] + arrays = [spark_type_pool[i % len(spark_type_pool)][0](rows) for i in range(num_cols)] names = [f"col_{i}" for i in range(num_cols)] batches.append(pa.RecordBatch.from_arrays(arrays, names=names)) schema = StructType( - [StructField(f"col_{i}", spark_types[i % len(spark_types)][1]) for i in range(num_cols)] + [ + StructField(f"col_{i}", spark_type_pool[i % len(spark_type_pool)][1]) + for i in range(num_cols) + ] ) return batches, schema @@ -234,8 +241,8 @@ def make_batch_groups( num_groups: int, num_rows: int, num_cols: int, - spark_types: list[tuple[Callable, Any]], batch_size: int = MAX_RECORDS_PER_BATCH, + spark_type_pool: list[tuple[Callable, Any]], ) -> tuple[list[tuple[pa.RecordBatch, ...]], StructType]: """Create groups of batches. @@ -246,13 +253,16 @@ def make_batch_groups( batches, _ = cls.make_batches( num_rows=num_rows, num_cols=num_cols, - spark_types=spark_types, batch_size=batch_size, + spark_type_pool=spark_type_pool, ) groups.append(tuple(batches)) schema = StructType( - [StructField(f"col_{i}", spark_types[i % len(spark_types)][1]) for i in range(num_cols)] + [ + StructField(f"col_{i}", spark_type_pool[i % len(spark_type_pool)][1]) + for i in range(num_cols) + ] ) return groups, schema @@ -333,10 +343,7 @@ class _ArrowBatchedBenchMixin: def _build_scenarios(): """Build scenarios for SQL_ARROW_BATCHED_UDF. - Returns a dict mapping scenario name to - ``(batches, input_struct_type, col0_type)``. - ``input_struct_type`` is a StructType matching the batch schema, - needed for the wire protocol. + Returns a dict mapping scenario name to ``(batches, schema)``. """ scenarios = {} @@ -351,10 +358,10 @@ def _build_scenarios(): batches, schema = MockDataFactory.make_batches( num_rows=num_rows, num_cols=num_cols, - spark_types=MockDataFactory.MIXED_TYPES, + spark_type_pool=MockDataFactory.MIXED_TYPES, batch_size=batch_size, ) - scenarios[name] = (batches, schema, schema.fields[0].dataType) + scenarios[name] = (batches, schema) _NUM_ROWS, _NUM_COLS, _BATCH_SIZE = 50_000, 10, 5_000 @@ -374,19 +381,19 @@ def _build_scenarios(): batches, schema = MockDataFactory.make_batches( num_rows=_NUM_ROWS, num_cols=_NUM_COLS, - spark_types=spark_types, + spark_type_pool=spark_types, batch_size=_BATCH_SIZE, ) - scenarios[scenario_name] = (batches, schema, schema.fields[0].dataType) + scenarios[scenario_name] = (batches, schema) # mixed types batches, schema = MockDataFactory.make_batches( num_rows=_NUM_ROWS, num_cols=_NUM_COLS, - spark_types=MockDataFactory.MIXED_TYPES, + spark_type_pool=MockDataFactory.MIXED_TYPES, batch_size=_BATCH_SIZE, ) - scenarios["mixed_types"] = (batches, schema, schema.fields[0].dataType) + scenarios["mixed_types"] = (batches, schema) return scenarios @@ -402,14 +409,13 @@ def _build_scenarios(): param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - batches, input_struct, col0_type = self._scenarios[scenario] + batches, schema = self._scenarios[scenario] udf_func, ret_type, arg_offsets = self._udfs[udf_name] if ret_type is None: - ret_type = col0_type + ret_type = schema.fields[0].dataType def write_udf(b): - # input_type is read before UDF payloads for ARROW_BATCHED_UDF - MockProtocolWriter.write_utf8(input_struct.json(), b) + MockProtocolWriter.write_utf8(schema.json(), b) MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b) MockProtocolWriter.write_worker_input( @@ -450,8 +456,7 @@ def _grouped_agg_arrow_mean_multi(col0, col1): def _build_scenarios(): """Build scenarios for SQL_GROUPED_AGG_ARROW_UDF / AGG_ARROW_ITER_UDF. - Returns dict mapping name to (groups, n_cols, arg_offsets_map) where - arg_offsets_map provides offsets per UDF name. + Returns a dict mapping scenario name to ``(groups, schema)``. """ scenarios = {} @@ -462,14 +467,14 @@ def _build_scenarios(): "many_groups_lg": (500, 10_000, 5), "wide_cols": (200, 5_000, 20), }.items(): - groups, _ = MockDataFactory.make_batch_groups( + groups, schema = MockDataFactory.make_batch_groups( num_groups=num_groups, num_rows=rows_per_group, num_cols=n_cols, - spark_types=MockDataFactory.NUMERIC_TYPES, + spark_type_pool=MockDataFactory.NUMERIC_TYPES, batch_size=rows_per_group, ) - scenarios[name] = (groups, n_cols) + scenarios[name] = (groups, schema) return scenarios @@ -482,7 +487,7 @@ def _build_scenarios(): param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - groups, n_cols = self._scenarios[scenario] + groups, _schema = self._scenarios[scenario] udf_func = self._udfs[udf_name] # sum_udf uses 1 arg, mean_multi_udf uses 2 args @@ -541,7 +546,7 @@ def _grouped_agg_arrow_iter_mean_multi(batch_iter): param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - groups, n_cols = self._scenarios[scenario] + groups, _schema = self._scenarios[scenario] udf_func = self._udfs[udf_name] # sum_udf uses 1 arg, mean_multi_udf uses 2 args @@ -595,7 +600,9 @@ def _grouped_map_arrow_filter(table): def _build_scenarios(): """Build scenarios for SQL_GROUPED_MAP_ARROW_UDF. - Returns dict mapping name to (groups, return_type, arg_offsets). + Returns a dict mapping scenario name to ``(groups, schema)``. + ``schema`` is the value-only return type (excluding key columns). + ``arg_offsets`` are derived at write time from data and schema. """ scenarios = {} @@ -616,13 +623,12 @@ def _build_scenarios(): num_groups=num_groups, num_rows=rows_per_group, num_cols=1, - spark_types=[struct_type], + spark_type_pool=[struct_type], batch_size=rows_per_group, ) inner_fields = schema.fields[0].dataType.fields return_type = StructType(inner_fields[num_key_cols:]) - arg_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, num_value_cols) - scenarios[name] = (groups, return_type, arg_offsets) + scenarios[name] = (groups, return_type) return scenarios @@ -636,25 +642,15 @@ def _build_scenarios(): param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - groups, return_type, arg_offsets = self._scenarios[scenario] + groups, schema = self._scenarios[scenario] udf_func = self._udfs[udf_name] - - def write_udf(b): - # Grouped map uses a single UDF with grouped arg_offsets - write_int(1, b) # num_udfs - write_int(len(arg_offsets), b) # num_arg - for offset in arg_offsets: - write_int(offset, b) - MockProtocolWriter.write_bool(False, b) # is_kwarg - write_int(1, b) # num_chained - command = cloudpickle_dumps((udf_func, return_type)) - write_int(len(command), b) - b.write(command) - write_long(0, b) # result_id - + n_total = groups[0][0].column(0).type.num_fields + n_values = len(schema.fields) + num_key_cols = n_total - n_values + arg_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, n_values) MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF, - write_udf, + lambda b: MockProtocolWriter.write_udf_payload(udf_func, schema, arg_offsets, b), lambda b: MockProtocolWriter.write_grouped_data_payload(groups, num_dfs=1, buf=b), buf, ) @@ -700,24 +696,15 @@ def _grouped_map_arrow_iter_filter(batches): param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - groups, return_type, arg_offsets = self._scenarios[scenario] + groups, schema = self._scenarios[scenario] udf_func = self._udfs[udf_name] - - def write_udf(b): - write_int(1, b) # num_udfs - write_int(len(arg_offsets), b) # num_arg - for offset in arg_offsets: - write_int(offset, b) - MockProtocolWriter.write_bool(False, b) # is_kwarg - write_int(1, b) # num_chained - command = cloudpickle_dumps((udf_func, return_type)) - write_int(len(command), b) - b.write(command) - write_long(0, b) # result_id - + n_total = groups[0][0].column(0).type.num_fields + n_values = len(schema.fields) + num_key_cols = n_total - n_values + arg_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, n_values) MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF, - write_udf, + lambda b: MockProtocolWriter.write_udf_payload(udf_func, schema, arg_offsets, b), lambda b: MockProtocolWriter.write_grouped_data_payload(groups, num_dfs=1, buf=b), buf, ) @@ -755,7 +742,7 @@ def _build_scenarios(): num_groups=num_groups, num_rows=rows, num_cols=n_cols, - spark_types=MockDataFactory.NUMERIC_TYPES, + spark_type_pool=MockDataFactory.NUMERIC_TYPES, ) scenarios[name] = (groups, schema) @@ -763,7 +750,7 @@ def _build_scenarios(): batches, schema = MockDataFactory.make_batches( num_rows=3, num_cols=5, - spark_types=MockDataFactory.MIXED_TYPES, + spark_type_pool=MockDataFactory.MIXED_TYPES, batch_size=3, ) scenarios["mixed_types"] = ([(b,) for b in batches] * 200, schema) @@ -842,6 +829,7 @@ def _filter_batch_iter(it): def _build_map_arrow_iter_scenarios(): """Build scenarios for SQL_MAP_ARROW_ITER_UDF. + Returns a dict mapping scenario name to ``(batches, schema)``. Same data shapes as non-grouped scenarios but with reduced batch counts to account for the struct wrap/unwrap overhead per batch. """ @@ -860,11 +848,10 @@ def _build_map_arrow_iter_scenarios(): batches, schema = MockDataFactory.make_batches( num_rows=num_rows, num_cols=1, - spark_types=[struct_type], + spark_type_pool=[struct_type], batch_size=batch_size, ) - col0_type = schema.fields[0].dataType.fields[0].dataType - scenarios[name] = (batches, col0_type) + scenarios[name] = (batches, schema) _NUM_ROWS, _NUM_COLS, _BATCH_SIZE = 1_000_000, 10, 5_000 @@ -900,11 +887,10 @@ def _build_map_arrow_iter_scenarios(): batches, schema = MockDataFactory.make_batches( num_rows=_NUM_ROWS, num_cols=1, - spark_types=[struct_type], + spark_type_pool=[struct_type], batch_size=_BATCH_SIZE, ) - col0_type = schema.fields[0].dataType.fields[0].dataType - scenarios[scenario_name] = (batches, col0_type) + scenarios[scenario_name] = (batches, schema) struct_type = MockDataFactory.make_struct_type( num_fields=_NUM_COLS, @@ -913,11 +899,10 @@ def _build_map_arrow_iter_scenarios(): batches, schema = MockDataFactory.make_batches( num_rows=_NUM_ROWS, num_cols=1, - spark_types=[struct_type], + spark_type_pool=[struct_type], batch_size=_BATCH_SIZE, ) - col0_type = schema.fields[0].dataType.fields[0].dataType - scenarios["mixed_types"] = (batches, col0_type) + scenarios["mixed_types"] = (batches, schema) return scenarios @@ -931,10 +916,10 @@ def _build_map_arrow_iter_scenarios(): param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - batches, col0_type = self._scenarios[scenario] + batches, schema = self._scenarios[scenario] udf_func, ret_type, arg_offsets = self._udfs[udf_name] if ret_type is None: - ret_type = col0_type + ret_type = schema.fields[0].dataType.fields[0].dataType MockProtocolWriter.write_worker_input( PythonEvalType.SQL_MAP_ARROW_ITER_UDF, lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b), @@ -971,7 +956,7 @@ def _nullcheck_arrow(c): def _build_scenarios(): """Build data-shape scenarios for non-grouped Arrow eval types. - Returns a dict mapping scenario name to ``(batches, col0_type)``. + Returns a dict mapping scenario name to ``(batches, schema)``. """ scenarios = {} @@ -984,10 +969,10 @@ def _build_scenarios(): batches, schema = MockDataFactory.make_batches( num_rows=num_rows, num_cols=num_cols, - spark_types=MockDataFactory.MIXED_TYPES, + spark_type_pool=MockDataFactory.MIXED_TYPES, batch_size=batch_size, ) - scenarios[name] = (batches, schema.fields[0].dataType) + scenarios[name] = (batches, schema) _NUM_ROWS, _NUM_COLS, _BATCH_SIZE = 5_000_000, 10, 5_000 @@ -1019,24 +1004,24 @@ def _build_scenarios(): batches, schema = MockDataFactory.make_batches( num_rows=_NUM_ROWS, num_cols=_NUM_COLS, - spark_types=spark_types, + spark_type_pool=spark_types, batch_size=_BATCH_SIZE, ) - scenarios[scenario_name] = (batches, schema.fields[0].dataType) + scenarios[scenario_name] = (batches, schema) batches, schema = MockDataFactory.make_batches( num_rows=_NUM_ROWS, num_cols=_NUM_COLS, - spark_types=MockDataFactory.MIXED_TYPES, + spark_type_pool=MockDataFactory.MIXED_TYPES, batch_size=_BATCH_SIZE, ) - scenarios["mixed_types"] = (batches, schema.fields[0].dataType) + scenarios["mixed_types"] = (batches, schema) return scenarios _eval_type = PythonEvalType.SQL_SCALAR_ARROW_UDF _scenarios = _build_scenarios() - # ret_type=None means "use col0_type from the scenario" + # ret_type=None means "use schema.fields[0].dataType from the scenario" _udfs = { "identity_udf": (lambda c: c, None, [0]), "sort_udf": (_sort_arrow, None, [0]), @@ -1046,10 +1031,10 @@ def _build_scenarios(): param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - batches, col0_type = self._scenarios[scenario] + batches, schema = self._scenarios[scenario] udf_func, ret_type, arg_offsets = self._udfs[udf_name] if ret_type is None: - ret_type = col0_type + ret_type = schema.fields[0].dataType MockProtocolWriter.write_worker_input( self._eval_type, lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b),