diff --git a/python/benchmarks/bench_eval_type.py b/python/benchmarks/bench_eval_type.py index a489d5d15ce5..74449db51d41 100644 --- a/python/benchmarks/bench_eval_type.py +++ b/python/benchmarks/bench_eval_type.py @@ -34,6 +34,8 @@ import numpy as np import pyarrow as pa +np.random.seed(42) + from pyspark.cloudpickle import dumps as cloudpickle_dumps from pyspark.serializers import write_int, write_long from pyspark.sql.types import ( @@ -41,7 +43,6 @@ BooleanType, DoubleType, IntegerType, - LongType, StringType, StructField, StructType, @@ -50,176 +51,232 @@ from pyspark.util import PythonEvalType from pyspark.worker import main as worker_main -# --------------------------------------------------------------------------- -# Wire-format helpers -# --------------------------------------------------------------------------- - - -def _write_utf8(s: str, buf: io.BytesIO) -> None: - """Write a length-prefixed UTF-8 string (matches ``UTF8Deserializer.loads``).""" - encoded = s.encode("utf-8") - write_int(len(encoded), buf) - buf.write(encoded) - - -def _write_bool(val: bool, buf: io.BytesIO) -> None: - buf.write(struct.pack("!?", val)) - # --------------------------------------------------------------------------- -# Worker protocol builder +# Mock helpers: protocol writer, data factory, UDF factory # --------------------------------------------------------------------------- -def _build_preamble(buf: io.BytesIO) -> None: - """Write everything ``main()`` reads before ``eval_type``.""" - write_int(0, buf) # split_index - _write_utf8(f"{sys.version_info[0]}.{sys.version_info[1]}", buf) # python version - _write_utf8( - json.dumps( - { - "isBarrier": False, - "stageId": 0, - "partitionId": 0, - "attemptNumber": 0, - "taskAttemptId": 0, - "cpus": 1, - "resources": {}, - "localProperties": {}, - } - ), - buf, - ) - _write_utf8("/tmp", buf) # spark_files_dir - write_int(0, buf) # num_python_includes - _write_bool(False, buf) # needs_broadcast_decryption_server - write_int(0, buf) # num_broadcast_variables - - -def _build_udf_payload( - udf_func: Callable[..., Any], - return_type: StructType, - arg_offsets: list[int], - buf: io.BytesIO, -) -> None: - """Write the ``read_single_udf`` portion of the protocol.""" - write_int(1, buf) # num_udfs - write_int(len(arg_offsets), buf) # num_arg - for offset in arg_offsets: - write_int(offset, buf) - _write_bool(False, buf) # is_kwarg - write_int(1, buf) # num_chained - command = cloudpickle_dumps((udf_func, return_type)) - write_int(len(command), buf) - buf.write(command) - write_long(0, buf) # result_id - - -def _write_arrow_ipc_batches(batch_iter: Iterator[pa.RecordBatch], buf: io.BufferedIOBase) -> None: - """Write a plain Arrow IPC stream from an iterator of Arrow batches.""" - first_batch = next(batch_iter) - writer = pa.RecordBatchStreamWriter(buf, first_batch.schema) - writer.write_batch(first_batch) - for batch in batch_iter: - writer.write_batch(batch) - writer.close() - - -def _write_worker_input( - eval_type: int, - write_command: Callable[[io.BufferedIOBase], None], - write_data: Callable[[io.BufferedIOBase], None], - buf: io.BufferedIOBase, -) -> None: - """Write the full worker binary stream: preamble + command + data + end. - - This is the general skeleton shared by all eval types. Callers provide - *write_command* (e.g. ``_build_udf_payload``) and *write_data* - (e.g. ``_write_arrow_ipc_batches``) to plug in protocol specifics. - """ - _build_preamble(buf) - write_int(eval_type, buf) - write_int(0, buf) # RunnerConf (0 key-value pairs) - write_int(0, buf) # EvalConf (0 key-value pairs) - write_command(buf) - write_data(buf) - write_int(-4, buf) # SpecialLengths.END_OF_STREAM - - -def _run_worker_from_replayed_file(write_input: Callable[[io.BufferedIOBase], None]) -> None: - """Write input to a temp file, then replay it through ``worker_main``.""" - fd, path = tempfile.mkstemp(prefix="spark-bench-replay-", suffix=".bin") - try: - with os.fdopen(fd, "w+b") as infile: - write_input(infile) - infile.flush() - infile.seek(0) - worker_main(infile, io.BytesIO()) - finally: - try: - os.remove(path) - except FileNotFoundError: - pass - - -def _make_typed_batch(rows: int, n_cols: int) -> tuple[pa.RecordBatch, IntegerType]: - """Columns cycling through int64, string, binary, boolean — reflects realistic serde costs.""" - type_cycle = [ - (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), IntegerType()), - (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), - (lambda r: pa.array([f"b{j}".encode() for j in range(r)]), BinaryType()), - (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), - ] - arrays = [type_cycle[i % len(type_cycle)][0](rows) for i in range(n_cols)] - fields = [StructField(f"col_{i}", type_cycle[i % len(type_cycle)][1]) for i in range(n_cols)] - return ( - pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]), - IntegerType(), - ) - - -def _make_grouped_batch(rows_per_group, n_cols): - """``group_key (int64)`` + ``(n_cols - 1)`` float32 value columns.""" - arrays = [pa.array(np.zeros(rows_per_group, dtype=np.int64))] + [ - pa.array(np.random.rand(rows_per_group).astype(np.float32)) for _ in range(n_cols - 1) - ] - fields = [StructField("group_key", IntegerType())] + [ - StructField(f"val_{i}", DoubleType()) for i in range(n_cols - 1) - ] - return ( - pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]), - StructType(fields), - ) - - -def _make_mixed_batch(rows_per_group): - """``id``, ``str_col``, ``float_col``, ``double_col``, ``long_col``.""" - arrays = [ - pa.array(np.zeros(rows_per_group, dtype=np.int64)), - pa.array([f"s{j}" for j in range(rows_per_group)]), - pa.array(np.random.rand(rows_per_group).astype(np.float32)), - pa.array(np.random.rand(rows_per_group)), - pa.array(np.zeros(rows_per_group, dtype=np.int64)), - ] - fields = [ - StructField("id", IntegerType()), - StructField("str_col", StringType()), - StructField("float_col", DoubleType()), - StructField("double_col", DoubleType()), - StructField("long_col", IntegerType()), +class MockProtocolWriter: + """Constructs the binary wire protocol that ``worker.py``'s ``main()`` expects.""" + + @classmethod + def write_bool(cls, val: bool, buf: io.BytesIO) -> None: + buf.write(struct.pack("!?", val)) + + @classmethod + def write_data_payload( + cls, batch_iter: Iterator[pa.RecordBatch], buf: io.BufferedIOBase + ) -> None: + """Write a plain Arrow IPC stream from an iterator of Arrow batches.""" + first_batch = next(batch_iter) + writer = pa.RecordBatchStreamWriter(buf, first_batch.schema) + writer.write_batch(first_batch) + for batch in batch_iter: + writer.write_batch(batch) + writer.close() + + @classmethod + def write_grouped_data_payload( + cls, + groups: list[tuple[pa.RecordBatch, ...]], + num_dfs: int, + buf: io.BufferedIOBase, + ) -> None: + """Write grouped Arrow data in the wire protocol expected by _load_group_dataframes.""" + for group in groups: + write_int(num_dfs, buf) + for df_batch in group: + cls.write_data_payload(iter([df_batch]), buf) + write_int(0, buf) # end of groups + + @classmethod + def write_preamble(cls, buf: io.BytesIO) -> None: + """Write everything ``main()`` reads before ``eval_type``.""" + write_int(0, buf) # split_index + cls.write_utf8(f"{sys.version_info[0]}.{sys.version_info[1]}", buf) # python version + cls.write_utf8( + json.dumps( + { + "isBarrier": False, + "stageId": 0, + "partitionId": 0, + "attemptNumber": 0, + "taskAttemptId": 0, + "cpus": 1, + "resources": {}, + "localProperties": {}, + } + ), + buf, + ) + cls.write_utf8("/tmp", buf) # spark_files_dir + write_int(0, buf) # num_python_includes + cls.write_bool(False, buf) # needs_broadcast_decryption_server + write_int(0, buf) # num_broadcast_variables + + @classmethod + def write_udf_payload( + cls, + udf_func: Callable[..., Any], + return_type: StructType, + arg_offsets: list[int], + buf: io.BytesIO, + ) -> None: + """Write the ``read_single_udf`` portion of the protocol.""" + write_int(1, buf) # num_udfs + write_int(len(arg_offsets), buf) # num_arg + for offset in arg_offsets: + write_int(offset, buf) + cls.write_bool(False, buf) # is_kwarg + write_int(1, buf) # num_chained + command = cloudpickle_dumps((udf_func, return_type)) + write_int(len(command), buf) + buf.write(command) + write_long(0, buf) # result_id + + @classmethod + def write_utf8(cls, s: str, buf: io.BytesIO) -> None: + """Write a length-prefixed UTF-8 string (matches ``UTF8Deserializer.loads``).""" + encoded = s.encode("utf-8") + write_int(len(encoded), buf) + buf.write(encoded) + + @classmethod + def write_worker_input( + cls, + eval_type: int, + write_udf: Callable[[io.BufferedIOBase], None], + write_data: Callable[[io.BufferedIOBase], None], + buf: io.BufferedIOBase, + ) -> None: + """Write the full worker binary stream: preamble + command + data + end.""" + cls.write_preamble(buf) + write_int(eval_type, buf) + write_int(0, buf) # RunnerConf (0 key-value pairs) + write_int(0, buf) # EvalConf (0 key-value pairs) + write_udf(buf) + write_data(buf) + write_int(-4, buf) # SpecialLengths.END_OF_STREAM + + +class MockDataFactory: + """Creates mock Arrow batches and group structures for benchmarks.""" + + MAX_RECORDS_PER_BATCH = 10_000 + + TYPE_REGISTRY: dict[str, tuple[Callable, Any]] = { + "int": (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int32)), IntegerType()), + "double": (lambda r: pa.array(np.random.rand(r)), DoubleType()), + "string": (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), + "binary": (lambda r: pa.array([f"b{j}".encode() for j in range(r)]), BinaryType()), + "boolean": (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), + } + + MIXED_TYPES = [ + TYPE_REGISTRY["int"], + TYPE_REGISTRY["string"], + TYPE_REGISTRY["binary"], + TYPE_REGISTRY["boolean"], ] - return ( - pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]), - StructType(fields), - ) + NUMERIC_TYPES = [TYPE_REGISTRY["int"], TYPE_REGISTRY["double"]] + + @classmethod + def make_struct_type( + cls, + *, + num_fields: int, + base_types: list[tuple[Callable, Any]], + ) -> tuple[Callable, StructType]: + """Compose flat ``base_types`` into a single struct type pool entry. + + Returns ``(factory_fn, StructType)`` suitable for inclusion in a + ``spark_type_pool`` list. The factory produces a ``pa.StructArray`` whose + sub-fields cycle through ``base_types``. + """ + fields = [ + StructField(f"col_{i}", base_types[i % len(base_types)][1]) for i in range(num_fields) + ] + + def factory(r): + arrays = [base_types[i % len(base_types)][0](r) for i in range(num_fields)] + names = [f.name for f in fields] + return pa.StructArray.from_arrays(arrays, names=names) + + return (factory, StructType(fields)) + + @classmethod + def make_batches( + cls, + *, + num_rows: int, + num_cols: int, + batch_size: int = MAX_RECORDS_PER_BATCH, + spark_type_pool: list[tuple[Callable, Any]], + ) -> tuple[list[pa.RecordBatch], StructType]: + """Create RecordBatches with columns cycling through ``spark_type_pool``. + + Splits ``num_rows`` into batches of at most ``batch_size`` rows. + Each pool entry is ``(factory_fn, SparkType)``; if an entry produces + a ``pa.StructArray``, it becomes a struct column naturally. + """ + batches = [] + for offset in range(0, num_rows, batch_size): + rows = min(batch_size, num_rows - offset) + arrays = [spark_type_pool[i % len(spark_type_pool)][0](rows) for i in range(num_cols)] + names = [f"col_{i}" for i in range(num_cols)] + batches.append(pa.RecordBatch.from_arrays(arrays, names=names)) + schema = StructType( + [ + StructField(f"col_{i}", spark_type_pool[i % len(spark_type_pool)][1]) + for i in range(num_cols) + ] + ) + return batches, schema + + @classmethod + def make_batch_groups( + cls, + *, + num_groups: int, + num_rows: int, + num_cols: int, + batch_size: int = MAX_RECORDS_PER_BATCH, + spark_type_pool: list[tuple[Callable, Any]], + ) -> tuple[list[tuple[pa.RecordBatch, ...]], StructType]: + """Create groups of batches. + + Each group has ``num_rows`` total rows, split into batches of ``batch_size``. + """ + groups = [] + for _ in range(num_groups): + batches, _ = cls.make_batches( + num_rows=num_rows, + num_cols=num_cols, + batch_size=batch_size, + spark_type_pool=spark_type_pool, + ) + groups.append(tuple(batches)) + + schema = StructType( + [ + StructField(f"col_{i}", spark_type_pool[i % len(spark_type_pool)][1]) + for i in range(num_cols) + ] + ) + return groups, schema + +class MockUDFFactory: + """Constructs UDF command payloads for the worker protocol.""" -def _build_grouped_arg_offsets(n_cols, n_keys=0): - """``[len, num_keys, key_col_0, ..., val_col_0, ...]``""" - keys = list(range(n_keys)) - vals = list(range(n_keys, n_cols)) - offsets = [n_keys] + keys + vals - return [len(offsets)] + offsets + @classmethod + def make_grouped_arg_offsets(cls, num_key_cols: int, num_value_cols: int) -> list[int]: + """Build arg_offsets: ``[total_len, num_keys, key_idx..., value_idx...]``.""" + key_idxs = list(range(num_key_cols)) + value_idxs = list(range(num_key_cols, num_key_cols + num_value_cols)) + offsets = [num_key_cols] + key_idxs + value_idxs + return [len(offsets)] + offsets # --------------------------------------------------------------------------- @@ -258,414 +315,179 @@ def setup(self, *args): self._args = args def peakmem_worker(self, *args): - _run_worker_from_replayed_file(lambda buf: self._write_scenario(*self._args, buf)) - - -# --------------------------------------------------------------------------- -# Non-grouped Arrow UDF benchmarks -# --------------------------------------------------------------------------- - -# Data-shape scenarios shared by all non-grouped eval types. -# Each entry maps to a ``(batch, num_batches, col0_type)`` tuple; the UDF is -# selected independently via the ``udf`` ASV parameter. -# ``col0_type`` is the Spark type of column 0, used as the default UDF return -# type when the UDF declaration leaves it as ``None``. - - -def _make_pure_batch(rows, n_cols, make_array, spark_type): - """Create a batch where all columns share the same Arrow type.""" - arrays = [make_array(rows) for _ in range(n_cols)] - fields = [StructField(f"col_{i}", spark_type) for i in range(n_cols)] - return pa.RecordBatch.from_arrays(arrays, names=[f.name for f in fields]) - - -def _build_non_grouped_scenarios(): - """Build data-shape scenarios for non-grouped Arrow eval types. - - Returns a dict mapping scenario name to ``(batch, num_batches, col0_type)``. - """ - scenarios = {} - - # Varying batch size and column count (mixed types cycling int/str/bin/bool) - for name, (rows, n_cols, num_batches) in { - "sm_batch_few_col": (1_000, 5, 1_500), - "sm_batch_many_col": (1_000, 50, 200), - "lg_batch_few_col": (10_000, 5, 3_500), - "lg_batch_many_col": (10_000, 50, 400), - }.items(): - batch, col0_type = _make_typed_batch(rows, n_cols) - scenarios[name] = (batch, num_batches, col0_type) - - # Pure-type scenarios (5000 rows, 10 cols, 1000 batches) - _PURE_ROWS, _PURE_COLS, _PURE_BATCHES = 5_000, 10, 1_000 - - scenarios["pure_ints"] = ( - _make_pure_batch( - _PURE_ROWS, - _PURE_COLS, - lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), - IntegerType(), - ), - _PURE_BATCHES, - IntegerType(), - ) - scenarios["pure_floats"] = ( - _make_pure_batch( - _PURE_ROWS, - _PURE_COLS, - lambda r: pa.array(np.random.rand(r)), - DoubleType(), - ), - _PURE_BATCHES, - DoubleType(), - ) - scenarios["pure_strings"] = ( - _make_pure_batch( - _PURE_ROWS, - _PURE_COLS, - lambda r: pa.array([f"s{j}" for j in range(r)]), - StringType(), - ), - _PURE_BATCHES, - StringType(), - ) - scenarios["pure_ts"] = ( - _make_pure_batch( - _PURE_ROWS, - _PURE_COLS, - lambda r: pa.array( - np.arange(0, r, dtype="datetime64[us]"), type=pa.timestamp("us", tz=None) - ), - TimestampNTZType(), - ), - _PURE_BATCHES, - TimestampNTZType(), - ) - scenarios["mixed_types"] = ( - _make_typed_batch(_PURE_ROWS, _PURE_COLS)[0], - _PURE_BATCHES, - IntegerType(), - ) - - return scenarios - - -_NON_GROUPED_SCENARIOS = _build_non_grouped_scenarios() - - -class _NonGroupedBenchMixin: - """Provides ``_write_scenario`` for non-grouped Arrow eval types. - - Subclasses set ``_eval_type``, ``_scenarios``, and ``_udfs``. - UDF entries with ``ret_type=None`` inherit ``col0_type`` from the scenario. - """ - - def _write_scenario(self, scenario, udf_name, buf): - batch, num_batches, col0_type = self._scenarios[scenario] - udf_func, ret_type, arg_offsets = self._udfs[udf_name] - if ret_type is None: - ret_type = col0_type - _write_worker_input( - self._eval_type, - lambda b: _build_udf_payload(udf_func, ret_type, arg_offsets, b), - lambda b: _write_arrow_ipc_batches((batch for _ in range(num_batches)), b), - buf, - ) + fd, path = tempfile.mkstemp(prefix="spark-bench-replay-", suffix=".bin") + try: + with os.fdopen(fd, "w+b") as infile: + self._write_scenario(*self._args, infile) + infile.flush() + infile.seek(0) + worker_main(infile, io.BytesIO()) + finally: + try: + os.remove(path) + except FileNotFoundError: + pass # -- SQL_ARROW_BATCHED_UDF -------------------------------------------------- -# Arrow-optimized Python UDF: receives individual Python values per row, -# returns a single Python value. The wire protocol includes an extra -# ``input_type`` (StructType JSON) before the UDF payload. - - -def _build_arrow_batched_scenarios(): - """Build scenarios for SQL_ARROW_BATCHED_UDF. - - Returns a dict mapping scenario name to - ``(batch, num_batches, input_struct_type, col0_type)``. - ``input_struct_type`` is a StructType matching the batch schema, - needed for the wire protocol. - """ - scenarios = {} - - # Row-by-row processing is ~100x slower than columnar Arrow UDFs, - # so batch counts are much smaller to keep benchmarks under 60s. - for name, (rows, n_cols, num_batches) in { - "sm_batch_few_col": (1_000, 5, 20), - "sm_batch_many_col": (1_000, 50, 5), - "lg_batch_few_col": (10_000, 5, 5), - "lg_batch_many_col": (10_000, 50, 2), - }.items(): - batch, col0_type = _make_typed_batch(rows, n_cols) - type_cycle = [IntegerType(), StringType(), BinaryType(), BooleanType()] - input_struct = StructType( - [StructField(f"col_{i}", type_cycle[i % len(type_cycle)]) for i in range(n_cols)] - ) - scenarios[name] = (batch, num_batches, input_struct, col0_type) - - _PURE_ROWS, _PURE_COLS, _PURE_BATCHES = 5_000, 10, 10 - - for scenario_name, make_array, spark_type in [ - ( - "pure_ints", - lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), - IntegerType(), - ), - ("pure_floats", lambda r: pa.array(np.random.rand(r)), DoubleType()), - ("pure_strings", lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), - ]: - batch = _make_pure_batch(_PURE_ROWS, _PURE_COLS, make_array, spark_type) - input_struct = StructType([StructField(f"col_{i}", spark_type) for i in range(_PURE_COLS)]) - scenarios[scenario_name] = (batch, _PURE_BATCHES, input_struct, spark_type) - - # mixed types - batch, col0_type = _make_typed_batch(_PURE_ROWS, _PURE_COLS) - type_cycle = [IntegerType(), StringType(), BinaryType(), BooleanType()] - input_struct = StructType( - [StructField(f"col_{i}", type_cycle[i % len(type_cycle)]) for i in range(_PURE_COLS)] - ) - scenarios["mixed_types"] = (batch, _PURE_BATCHES, input_struct, col0_type) - - return scenarios - - -_ARROW_BATCHED_SCENARIOS = _build_arrow_batched_scenarios() - - -# UDFs for SQL_ARROW_BATCHED_UDF operate on individual Python values. -# arg_offsets=[0] means the UDF receives column 0 value per row. -_ARROW_BATCHED_UDFS = { - "identity_udf": (lambda x: x, None, [0]), - "stringify_udf": (lambda x: str(x), StringType(), [0]), - "nullcheck_udf": (lambda x: x is not None, BooleanType(), [0]), -} +# UDF receives individual Python values per row, returns a single Python value. class _ArrowBatchedBenchMixin: """Provides ``_write_scenario`` for SQL_ARROW_BATCHED_UDF. - Like ``_NonGroupedBenchMixin`` but writes the extra ``input_type`` - (StructType JSON) that the wire protocol requires. + Writes the extra ``input_type`` (StructType JSON) that the wire protocol + requires before the UDF payload. """ + def _build_scenarios(): + """Build scenarios for SQL_ARROW_BATCHED_UDF. + + Returns a dict mapping scenario name to ``(batches, schema)``. + """ + scenarios = {} + + # Row-by-row processing is ~100x slower than columnar Arrow UDFs, + # so batch counts are much smaller to keep benchmarks under 60s. + for name, (num_rows, num_cols, batch_size) in { + "sm_batch_few_col": (20_000, 5, 1_000), + "sm_batch_many_col": (5_000, 50, 1_000), + "lg_batch_few_col": (50_000, 5, 10_000), + "lg_batch_many_col": (20_000, 50, 10_000), + }.items(): + batches, schema = MockDataFactory.make_batches( + num_rows=num_rows, + num_cols=num_cols, + spark_type_pool=MockDataFactory.MIXED_TYPES, + batch_size=batch_size, + ) + scenarios[name] = (batches, schema) + + _NUM_ROWS, _NUM_COLS, _BATCH_SIZE = 50_000, 10, 5_000 + + for scenario_name, spark_types in [ + ( + "pure_ints", + [ + ( + lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), + IntegerType(), + ) + ], + ), + ("pure_floats", [(lambda r: pa.array(np.random.rand(r)), DoubleType())]), + ("pure_strings", [(lambda r: pa.array([f"s{j}" for j in range(r)]), StringType())]), + ]: + batches, schema = MockDataFactory.make_batches( + num_rows=_NUM_ROWS, + num_cols=_NUM_COLS, + spark_type_pool=spark_types, + batch_size=_BATCH_SIZE, + ) + scenarios[scenario_name] = (batches, schema) + + # mixed types + batches, schema = MockDataFactory.make_batches( + num_rows=_NUM_ROWS, + num_cols=_NUM_COLS, + spark_type_pool=MockDataFactory.MIXED_TYPES, + batch_size=_BATCH_SIZE, + ) + scenarios["mixed_types"] = (batches, schema) + + return scenarios + + _scenarios = _build_scenarios() + # UDFs for SQL_ARROW_BATCHED_UDF operate on individual Python values. + # arg_offsets=[0] means the UDF receives column 0 value per row. + _udfs = { + "identity_udf": (lambda x: x, None, [0]), + "stringify_udf": (lambda x: str(x), StringType(), [0]), + "nullcheck_udf": (lambda x: x is not None, BooleanType(), [0]), + } + params = [list(_scenarios), list(_udfs)] + param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): - batch, num_batches, input_struct, col0_type = self._scenarios[scenario] + batches, schema = self._scenarios[scenario] udf_func, ret_type, arg_offsets = self._udfs[udf_name] if ret_type is None: - ret_type = col0_type + ret_type = schema.fields[0].dataType - def write_command(b): - # input_type is read before UDF payloads for ARROW_BATCHED_UDF - _write_utf8(input_struct.json(), b) - _build_udf_payload(udf_func, ret_type, arg_offsets, b) + def write_udf(b): + MockProtocolWriter.write_utf8(schema.json(), b) + MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b) - _write_worker_input( + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_ARROW_BATCHED_UDF, - write_command, - lambda b: _write_arrow_ipc_batches((batch for _ in range(num_batches)), b), + write_udf, + lambda b: MockProtocolWriter.write_data_payload(iter(batches), b), buf, ) class ArrowBatchedUDFTimeBench(_ArrowBatchedBenchMixin, _TimeBenchBase): - _scenarios = _ARROW_BATCHED_SCENARIOS - _udfs = _ARROW_BATCHED_UDFS - params = [list(_ARROW_BATCHED_SCENARIOS), list(_ARROW_BATCHED_UDFS)] - param_names = ["scenario", "udf"] + pass class ArrowBatchedUDFPeakmemBench(_ArrowBatchedBenchMixin, _PeakmemBenchBase): - _scenarios = _ARROW_BATCHED_SCENARIOS - _udfs = _ARROW_BATCHED_UDFS - params = [list(_ARROW_BATCHED_SCENARIOS), list(_ARROW_BATCHED_UDFS)] - param_names = ["scenario", "udf"] - - -# --------------------------------------------------------------------------- -# Grouped Arrow UDF helpers -# --------------------------------------------------------------------------- - - -def _write_grouped_arrow_data( - groups: list[tuple[pa.RecordBatch, ...]], - num_dfs: int, - buf: io.BufferedIOBase, - max_records_per_batch: int | None = None, -) -> None: - """Write grouped Arrow data in the wire protocol expected by _load_group_dataframes.""" - for group in groups: - write_int(num_dfs, buf) - for df_batch in group: - if max_records_per_batch and df_batch.num_rows > max_records_per_batch: - sub_batches = [ - df_batch.slice(offset, max_records_per_batch) - for offset in range(0, df_batch.num_rows, max_records_per_batch) - ] - _write_arrow_ipc_batches(iter(sub_batches), buf) - else: - _write_arrow_ipc_batches(iter([df_batch]), buf) - write_int(0, buf) # end of groups - - -def _make_grouped_arg_offsets(num_key_cols: int, num_value_cols: int) -> list[int]: - """Build arg_offsets for grouped map UDFs. - - Format: [total_len, num_keys, key_idx..., value_idx...] - All columns are in a single struct, so key indexes come first, - then value indexes. - """ - total = num_key_cols + num_value_cols + 1 # +1 for the num_keys prefix - key_idxs = list(range(num_key_cols)) - value_idxs = list(range(num_key_cols, num_key_cols + num_value_cols)) - return [total, num_key_cols] + key_idxs + value_idxs - - -def _make_grouped_batches( - num_groups: int, - rows_per_group: int, - num_key_cols: int, - num_value_cols: int, -) -> tuple[list[tuple[pa.RecordBatch, ...]], StructType]: - """Create grouped data: each group is a single batch wrapped in a struct column. - - Returns (groups, return_type) where each group is a 1-tuple of a - struct-wrapped RecordBatch suitable for ArrowStreamGroupUDFSerializer. - """ - n_cols = num_key_cols + num_value_cols - type_cycle = [ - (lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), LongType()), - (lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), - (lambda r: pa.array(np.random.choice([True, False], r)), BooleanType()), - (lambda r: pa.array(np.random.rand(r)), DoubleType()), - ] - - groups = [] - for g in range(num_groups): - arrays = [type_cycle[i % len(type_cycle)][0](rows_per_group) for i in range(n_cols)] - names = [f"col_{i}" for i in range(n_cols)] - batch = pa.RecordBatch.from_arrays(arrays, names=names) - # Wrap in struct to match JVM-side encoding (flatten_struct undoes this) - struct_array = pa.StructArray.from_arrays(batch.columns, names=names) - wrapped = pa.RecordBatch.from_arrays([struct_array], names=["_0"]) - groups.append((wrapped,)) - - value_fields = [ - StructField(f"col_{i}", type_cycle[i % len(type_cycle)][1]) - for i in range(num_key_cols, n_cols) - ] - return_type = StructType(value_fields) - - return groups, return_type + pass # -- SQL_GROUPED_AGG_ARROW_UDF ------------------------------------------------ -# UDF receives pa.Array columns per group, returns scalar. - - -def _grouped_agg_arrow_sum(col): - """Sum a single Arrow column.""" - import pyarrow.compute as pc - - return pc.sum(col).as_py() - - -def _grouped_agg_arrow_mean_multi(col0, col1): - """Mean of two Arrow columns combined.""" - import pyarrow.compute as pc - - return (pc.mean(col0).as_py() or 0) + (pc.mean(col1).as_py() or 0) - - -_GROUPED_AGG_ARROW_UDFS = { - "sum_udf": _grouped_agg_arrow_sum, - "mean_multi_udf": _grouped_agg_arrow_mean_multi, -} - - -# -- SQL_GROUPED_AGG_ARROW_ITER_UDF ------------------------------------------ -# UDF receives Iterator[pa.Array] (or Iterator[Tuple[pa.Array, ...]]) per group, -# returns scalar. - - -def _grouped_agg_arrow_iter_sum(batch_iter): - """Sum across batches via iterator.""" - import pyarrow.compute as pc - - total = 0 - for col in batch_iter: - total += pc.sum(col).as_py() or 0 - return total - - -def _grouped_agg_arrow_iter_mean_multi(batch_iter): - """Mean across batches of tuples via iterator.""" - import pyarrow.compute as pc - - total = 0.0 - for col0, col1 in batch_iter: - total += (pc.mean(col0).as_py() or 0) + (pc.mean(col1).as_py() or 0) - return total - - -_GROUPED_AGG_ARROW_ITER_UDFS = { - "sum_udf": _grouped_agg_arrow_iter_sum, - "mean_multi_udf": _grouped_agg_arrow_iter_mean_multi, -} - - -def _make_agg_arrow_groups( - num_groups: int, - rows_per_group: int, - n_cols: int, -) -> list[tuple[pa.RecordBatch, ...]]: - """Create grouped data for agg Arrow UDFs (plain batches, not struct-wrapped).""" - type_cycle = [ - lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), - lambda r: pa.array(np.random.rand(r)), - lambda r: pa.array(np.random.randint(0, 100, r, dtype=np.int64)), - lambda r: pa.array(np.random.rand(r)), - ] - - groups = [] - for _ in range(num_groups): - arrays = [type_cycle[i % len(type_cycle)](rows_per_group) for i in range(n_cols)] - names = [f"col_{i}" for i in range(n_cols)] - batch = pa.RecordBatch.from_arrays(arrays, names=names) - groups.append((batch,)) - - return groups - - -def _build_grouped_agg_arrow_scenarios(): - """Build scenarios for SQL_GROUPED_AGG_ARROW_UDF / AGG_ARROW_ITER_UDF. - - Returns dict mapping name to (groups, n_cols, arg_offsets_map) where - arg_offsets_map provides offsets per UDF name. - """ - scenarios = {} - - for name, (num_groups, rows_per_group, n_cols) in { - "few_groups_sm": (50, 5_000, 5), - "few_groups_lg": (50, 50_000, 5), - "many_groups_sm": (2_000, 500, 5), - "many_groups_lg": (500, 10_000, 5), - "wide_cols": (200, 5_000, 20), - }.items(): - groups = _make_agg_arrow_groups(num_groups, rows_per_group, n_cols) - scenarios[name] = (groups, n_cols) - - return scenarios - - -_GROUPED_AGG_ARROW_SCENARIOS = _build_grouped_agg_arrow_scenarios() +# UDF receives ``pa.Array`` columns per group, returns scalar. class _GroupedAggArrowBenchMixin: """Provides _write_scenario for SQL_GROUPED_AGG_ARROW_UDF.""" + def _grouped_agg_arrow_sum(col): + """Sum a single Arrow column.""" + import pyarrow.compute as pc + + return pc.sum(col).as_py() + + def _grouped_agg_arrow_mean_multi(col0, col1): + """Mean of two Arrow columns combined.""" + import pyarrow.compute as pc + + return (pc.mean(col0).as_py() or 0) + (pc.mean(col1).as_py() or 0) + + def _build_scenarios(): + """Build scenarios for SQL_GROUPED_AGG_ARROW_UDF / AGG_ARROW_ITER_UDF. + + Returns a dict mapping scenario name to ``(groups, schema)``. + """ + scenarios = {} + + for name, (num_groups, rows_per_group, n_cols) in { + "few_groups_sm": (50, 5_000, 5), + "few_groups_lg": (50, 50_000, 5), + "many_groups_sm": (2_000, 500, 5), + "many_groups_lg": (500, 10_000, 5), + "wide_cols": (200, 5_000, 20), + }.items(): + groups, schema = MockDataFactory.make_batch_groups( + num_groups=num_groups, + num_rows=rows_per_group, + num_cols=n_cols, + spark_type_pool=MockDataFactory.NUMERIC_TYPES, + batch_size=rows_per_group, + ) + scenarios[name] = (groups, schema) + + return scenarios + + _scenarios = _build_scenarios() + _udfs = { + "sum_udf": _grouped_agg_arrow_sum, + "mean_multi_udf": _grouped_agg_arrow_mean_multi, + } + params = [list(_scenarios), list(_udfs)] + param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): - groups, n_cols = self._scenarios[scenario] + groups, _schema = self._scenarios[scenario] udf_func = self._udfs[udf_name] # sum_udf uses 1 arg, mean_multi_udf uses 2 args @@ -676,36 +498,55 @@ def _write_scenario(self, scenario, udf_name, buf): return_type = DoubleType() - def write_command(b): - _build_udf_payload(udf_func, return_type, arg_offsets, b) + def write_udf(b): + MockProtocolWriter.write_udf_payload(udf_func, return_type, arg_offsets, b) - _write_worker_input( + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF, - write_command, - lambda b: _write_grouped_arrow_data(groups, num_dfs=1, buf=b), + write_udf, + lambda b: MockProtocolWriter.write_grouped_data_payload(groups, num_dfs=1, buf=b), buf, ) class GroupedAggArrowUDFTimeBench(_GroupedAggArrowBenchMixin, _TimeBenchBase): - _scenarios = _GROUPED_AGG_ARROW_SCENARIOS - _udfs = _GROUPED_AGG_ARROW_UDFS - params = [list(_GROUPED_AGG_ARROW_SCENARIOS), list(_GROUPED_AGG_ARROW_UDFS)] - param_names = ["scenario", "udf"] + pass class GroupedAggArrowUDFPeakmemBench(_GroupedAggArrowBenchMixin, _PeakmemBenchBase): - _scenarios = _GROUPED_AGG_ARROW_SCENARIOS - _udfs = _GROUPED_AGG_ARROW_UDFS - params = [list(_GROUPED_AGG_ARROW_SCENARIOS), list(_GROUPED_AGG_ARROW_UDFS)] - param_names = ["scenario", "udf"] + pass -class _GroupedAggArrowIterBenchMixin: +class _GroupedAggArrowIterBenchMixin(_GroupedAggArrowBenchMixin): """Provides _write_scenario for SQL_GROUPED_AGG_ARROW_ITER_UDF.""" + def _grouped_agg_arrow_iter_sum(batch_iter): + """Sum across batches via iterator.""" + import pyarrow.compute as pc + + total = 0 + for col in batch_iter: + total += pc.sum(col).as_py() or 0 + return total + + def _grouped_agg_arrow_iter_mean_multi(batch_iter): + """Mean across batches of tuples via iterator.""" + import pyarrow.compute as pc + + total = 0.0 + for col0, col1 in batch_iter: + total += (pc.mean(col0).as_py() or 0) + (pc.mean(col1).as_py() or 0) + return total + + _udfs = { + "sum_udf": _grouped_agg_arrow_iter_sum, + "mean_multi_udf": _grouped_agg_arrow_iter_mean_multi, + } + params = [_GroupedAggArrowBenchMixin.params[0], list(_udfs)] + param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): - groups, n_cols = self._scenarios[scenario] + groups, _schema = self._scenarios[scenario] udf_func = self._udfs[udf_name] # sum_udf uses 1 arg, mean_multi_udf uses 2 args @@ -716,488 +557,535 @@ def _write_scenario(self, scenario, udf_name, buf): return_type = DoubleType() - def write_command(b): - _build_udf_payload(udf_func, return_type, arg_offsets, b) + def write_udf(b): + MockProtocolWriter.write_udf_payload(udf_func, return_type, arg_offsets, b) - _write_worker_input( + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF, - write_command, - lambda b: _write_grouped_arrow_data(groups, num_dfs=1, buf=b), + write_udf, + lambda b: MockProtocolWriter.write_grouped_data_payload(groups, num_dfs=1, buf=b), buf, ) class GroupedAggArrowIterUDFTimeBench(_GroupedAggArrowIterBenchMixin, _TimeBenchBase): - _scenarios = _GROUPED_AGG_ARROW_SCENARIOS - _udfs = _GROUPED_AGG_ARROW_ITER_UDFS - params = [list(_GROUPED_AGG_ARROW_SCENARIOS), list(_GROUPED_AGG_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] + pass class GroupedAggArrowIterUDFPeakmemBench(_GroupedAggArrowIterBenchMixin, _PeakmemBenchBase): - _scenarios = _GROUPED_AGG_ARROW_SCENARIOS - _udfs = _GROUPED_AGG_ARROW_ITER_UDFS - params = [list(_GROUPED_AGG_ARROW_SCENARIOS), list(_GROUPED_AGG_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] + pass # -- SQL_GROUPED_MAP_ARROW_UDF ------------------------------------------------ -# UDF receives (key: pa.RecordBatch, values: Iterator[pa.RecordBatch]), -# returns Iterator[pa.RecordBatch]. - - -def _grouped_map_arrow_identity(table): - """Identity grouped map UDF: takes a pa.Table, returns it as-is.""" - return table - - -def _grouped_map_arrow_sort(table): - """Sort by first column.""" - return table.sort_by([(table.column_names[0], "ascending")]) - - -def _grouped_map_arrow_filter(table): - """Filter rows where first column is valid.""" - import pyarrow.compute as pc - - return table.filter(pc.is_valid(table.column(0))) - - -_GROUPED_MAP_ARROW_UDFS = { - "identity_udf": _grouped_map_arrow_identity, - "sort_udf": _grouped_map_arrow_sort, - "filter_udf": _grouped_map_arrow_filter, -} - - -def _build_grouped_map_arrow_scenarios(): - """Build scenarios for SQL_GROUPED_MAP_ARROW_UDF. - - Returns dict mapping name to (groups, return_type, arg_offsets). - """ - scenarios = {} - - for name, (num_groups, rows_per_group, num_key_cols, num_value_cols) in { - "few_groups_sm": (50, 5_000, 1, 4), - "few_groups_lg": (50, 50_000, 1, 4), - "many_groups_sm": (2_000, 500, 1, 4), - "many_groups_lg": (500, 10_000, 1, 4), - "wide_values": (200, 5_000, 1, 20), - "multi_key": (200, 5_000, 3, 5), - }.items(): - groups, return_type = _make_grouped_batches( - num_groups, rows_per_group, num_key_cols, num_value_cols - ) - arg_offsets = _make_grouped_arg_offsets(num_key_cols, num_value_cols) - scenarios[name] = (groups, return_type, arg_offsets) - - return scenarios - - -_GROUPED_MAP_ARROW_SCENARIOS = _build_grouped_map_arrow_scenarios() +# UDF receives ``pa.Table``, returns ``pa.Table``. class _GroupedMapArrowBenchMixin: """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_UDF.""" + def _grouped_map_arrow_identity(table): + """Identity grouped map UDF: takes a pa.Table, returns it as-is.""" + return table + + def _grouped_map_arrow_sort(table): + """Sort by first column.""" + return table.sort_by([(table.column_names[0], "ascending")]) + + def _grouped_map_arrow_filter(table): + """Filter rows where first column is valid.""" + import pyarrow.compute as pc + + return table.filter(pc.is_valid(table.column(0))) + + def _build_scenarios(): + """Build scenarios for SQL_GROUPED_MAP_ARROW_UDF. + + Returns a dict mapping scenario name to ``(groups, schema)``. + ``schema`` is the value-only return type (excluding key columns). + ``arg_offsets`` are derived at write time from data and schema. + """ + scenarios = {} + + for name, (num_groups, rows_per_group, num_key_cols, num_value_cols) in { + "few_groups_sm": (50, 5_000, 1, 4), + "few_groups_lg": (50, 50_000, 1, 4), + "many_groups_sm": (2_000, 500, 1, 4), + "many_groups_lg": (500, 10_000, 1, 4), + "wide_values": (200, 5_000, 1, 20), + "multi_key": (200, 5_000, 3, 5), + }.items(): + n_fields = num_key_cols + num_value_cols + struct_type = MockDataFactory.make_struct_type( + num_fields=n_fields, + base_types=MockDataFactory.MIXED_TYPES, + ) + groups, schema = MockDataFactory.make_batch_groups( + num_groups=num_groups, + num_rows=rows_per_group, + num_cols=1, + spark_type_pool=[struct_type], + batch_size=rows_per_group, + ) + inner_fields = schema.fields[0].dataType.fields + return_type = StructType(inner_fields[num_key_cols:]) + scenarios[name] = (groups, return_type) + + return scenarios + + _scenarios = _build_scenarios() + _udfs = { + "identity_udf": _grouped_map_arrow_identity, + "sort_udf": _grouped_map_arrow_sort, + "filter_udf": _grouped_map_arrow_filter, + } + params = [list(_scenarios), list(_udfs)] + param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): - groups, return_type, arg_offsets = self._scenarios[scenario] + groups, schema = self._scenarios[scenario] udf_func = self._udfs[udf_name] - - def write_command(b): - # Grouped map uses a single UDF with grouped arg_offsets - write_int(1, b) # num_udfs - write_int(len(arg_offsets), b) # num_arg - for offset in arg_offsets: - write_int(offset, b) - _write_bool(False, b) # is_kwarg - write_int(1, b) # num_chained - command = cloudpickle_dumps((udf_func, return_type)) - write_int(len(command), b) - b.write(command) - write_long(0, b) # result_id - - _write_worker_input( + n_total = groups[0][0].column(0).type.num_fields + n_values = len(schema.fields) + num_key_cols = n_total - n_values + arg_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, n_values) + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_MAP_ARROW_UDF, - write_command, - lambda b: _write_grouped_arrow_data(groups, num_dfs=1, buf=b), + lambda b: MockProtocolWriter.write_udf_payload(udf_func, schema, arg_offsets, b), + lambda b: MockProtocolWriter.write_grouped_data_payload(groups, num_dfs=1, buf=b), buf, ) class GroupedMapArrowUDFTimeBench(_GroupedMapArrowBenchMixin, _TimeBenchBase): - _scenarios = _GROUPED_MAP_ARROW_SCENARIOS - _udfs = _GROUPED_MAP_ARROW_UDFS - params = [list(_GROUPED_MAP_ARROW_SCENARIOS), list(_GROUPED_MAP_ARROW_UDFS)] - param_names = ["scenario", "udf"] + pass class GroupedMapArrowUDFPeakmemBench(_GroupedMapArrowBenchMixin, _PeakmemBenchBase): - _scenarios = _GROUPED_MAP_ARROW_SCENARIOS - _udfs = _GROUPED_MAP_ARROW_UDFS - params = [list(_GROUPED_MAP_ARROW_SCENARIOS), list(_GROUPED_MAP_ARROW_UDFS)] - param_names = ["scenario", "udf"] + pass # -- SQL_GROUPED_MAP_ARROW_ITER_UDF ------------------------------------------ -# UDF receives Iterator[pa.RecordBatch] per group, -# returns Iterator[pa.RecordBatch]. -# Uses the same wire format and serializer as SQL_GROUPED_MAP_ARROW_UDF. - - -def _grouped_map_arrow_iter_identity(batches): - """Identity grouped map iter UDF: yields each batch as-is.""" - yield from batches - +# UDF receives ``Iterator[pa.RecordBatch]`` per group, returns ``Iterator[pa.RecordBatch]``. -def _grouped_map_arrow_iter_sort(batches): - """Sort each batch by first column.""" - for batch in batches: - yield batch.sort_by([(batch.column_names[0], "ascending")]) +class _GroupedMapArrowIterBenchMixin(_GroupedMapArrowBenchMixin): + """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_ITER_UDF.""" -def _grouped_map_arrow_iter_filter(batches): - """Filter rows where first column is valid.""" - import pyarrow.compute as pc - - for batch in batches: - yield batch.filter(pc.is_valid(batch.column(0))) + def _grouped_map_arrow_iter_identity(batches): + """Identity grouped map iter UDF: yields each batch as-is.""" + yield from batches + def _grouped_map_arrow_iter_sort(batches): + """Sort each batch by first column.""" + for batch in batches: + yield batch.sort_by([(batch.column_names[0], "ascending")]) -_GROUPED_MAP_ARROW_ITER_UDFS = { - "identity_udf": _grouped_map_arrow_iter_identity, - "sort_udf": _grouped_map_arrow_iter_sort, - "filter_udf": _grouped_map_arrow_iter_filter, -} + def _grouped_map_arrow_iter_filter(batches): + """Filter rows where first column is valid.""" + import pyarrow.compute as pc + for batch in batches: + yield batch.filter(pc.is_valid(batch.column(0))) -class _GroupedMapArrowIterBenchMixin: - """Provides _write_scenario for SQL_GROUPED_MAP_ARROW_ITER_UDF.""" + _udfs = { + "identity_udf": _grouped_map_arrow_iter_identity, + "sort_udf": _grouped_map_arrow_iter_sort, + "filter_udf": _grouped_map_arrow_iter_filter, + } + params = [_GroupedMapArrowBenchMixin.params[0], list(_udfs)] + param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - groups, return_type, arg_offsets = self._scenarios[scenario] + groups, schema = self._scenarios[scenario] udf_func = self._udfs[udf_name] - - def write_command(b): - write_int(1, b) # num_udfs - write_int(len(arg_offsets), b) # num_arg - for offset in arg_offsets: - write_int(offset, b) - _write_bool(False, b) # is_kwarg - write_int(1, b) # num_chained - command = cloudpickle_dumps((udf_func, return_type)) - write_int(len(command), b) - b.write(command) - write_long(0, b) # result_id - - _write_worker_input( + n_total = groups[0][0].column(0).type.num_fields + n_values = len(schema.fields) + num_key_cols = n_total - n_values + arg_offsets = MockUDFFactory.make_grouped_arg_offsets(num_key_cols, n_values) + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_MAP_ARROW_ITER_UDF, - write_command, - lambda b: _write_grouped_arrow_data(groups, num_dfs=1, buf=b), + lambda b: MockProtocolWriter.write_udf_payload(udf_func, schema, arg_offsets, b), + lambda b: MockProtocolWriter.write_grouped_data_payload(groups, num_dfs=1, buf=b), buf, ) class GroupedMapArrowIterUDFTimeBench(_GroupedMapArrowIterBenchMixin, _TimeBenchBase): - _scenarios = _GROUPED_MAP_ARROW_SCENARIOS - _udfs = _GROUPED_MAP_ARROW_ITER_UDFS - params = [list(_GROUPED_MAP_ARROW_SCENARIOS), list(_GROUPED_MAP_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] + pass class GroupedMapArrowIterUDFPeakmemBench(_GroupedMapArrowIterBenchMixin, _PeakmemBenchBase): - _scenarios = _GROUPED_MAP_ARROW_SCENARIOS - _udfs = _GROUPED_MAP_ARROW_ITER_UDFS - params = [list(_GROUPED_MAP_ARROW_SCENARIOS), list(_GROUPED_MAP_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] + pass # -- SQL_GROUPED_MAP_PANDAS_UDF ------------------------------------------------ -# UDF receives a ``pandas.DataFrame`` per group, returns a ``pandas.DataFrame``. -# Groups are sent as separate Arrow IPC streams with optional sub-batching -# (``spark.sql.execution.arrow.maxRecordsPerBatch``). - -_MAX_RECORDS_PER_BATCH = 10_000 - - -def _build_grouped_map_pandas_scenarios(): - scenarios = {} - - for name, (rows, n_cols, num_groups, max_rpb) in { - "sm_grp_few_col": (1_000, 5, 200, None), - "sm_grp_many_col": (1_000, 50, 30, None), - "lg_grp_few_col": (100_000, 5, 30, _MAX_RECORDS_PER_BATCH), - "lg_grp_many_col": (100_000, 50, 5, _MAX_RECORDS_PER_BATCH), - }.items(): - batch, schema = _make_grouped_batch(rows, n_cols) - scenarios[name] = (batch, num_groups, schema, max_rpb) - - # mixed column types, small groups - batch, schema = _make_mixed_batch(3) - scenarios["mixed_types"] = (batch, 200, schema, None) - - return scenarios - - -_GROUPED_MAP_PANDAS_SCENARIOS = _build_grouped_map_pandas_scenarios() - -# Each UDF entry: (func, ret_type, n_args). -# ret_type=None means "use the input schema" (excluding key columns for n_args=2). -# n_args=1 -> func(pdf), n_args=2 -> func(key, pdf). -_GROUPED_MAP_PANDAS_UDFS = { - "identity_udf": (lambda df: df, None, 1), - "sort_udf": (lambda df: df.sort_values(df.columns[0]), None, 1), - "key_identity_udf": (lambda key, df: df, None, 2), -} +# UDF receives ``pandas.DataFrame`` per group, returns ``pandas.DataFrame``. class _GroupedMapPandasBenchMixin: """Provides ``_write_scenario`` for SQL_GROUPED_MAP_PANDAS_UDF. - Each scenario stores ``(batch, num_groups, schema, max_rpb)``. + Each scenario stores ``(groups, schema)``. Groups are written as separate Arrow IPC streams. """ + def _build_scenarios(): + scenarios = {} + + for name, (rows, n_cols, num_groups) in { + "sm_grp_few_col": (1_000, 5, 200), + "sm_grp_many_col": (1_000, 50, 30), + "lg_grp_few_col": (100_000, 5, 30), + "lg_grp_many_col": (100_000, 50, 5), + }.items(): + groups, schema = MockDataFactory.make_batch_groups( + num_groups=num_groups, + num_rows=rows, + num_cols=n_cols, + spark_type_pool=MockDataFactory.NUMERIC_TYPES, + ) + scenarios[name] = (groups, schema) + + # mixed column types, small groups + batches, schema = MockDataFactory.make_batches( + num_rows=3, + num_cols=5, + spark_type_pool=MockDataFactory.MIXED_TYPES, + batch_size=3, + ) + scenarios["mixed_types"] = ([(b,) for b in batches] * 200, schema) + + return scenarios + + _scenarios = _build_scenarios() + # Each UDF entry: (func, ret_type, n_args). + # ret_type=None means "use the input schema" (excluding key columns for n_args=2). + # n_args=1 -> func(pdf), n_args=2 -> func(key, pdf). + _udfs = { + "identity_udf": (lambda df: df, None, 1), + "sort_udf": (lambda df: df.sort_values(df.columns[0]), None, 1), + "key_identity_udf": (lambda key, df: df, None, 2), + } + params = [list(_scenarios), list(_udfs)] + param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): - batch, num_groups, schema, max_rpb = self._scenarios[scenario] + groups, schema = self._scenarios[scenario] udf_func, ret_type, n_args = self._udfs[udf_name] if ret_type is None: # 2-arg UDFs receive (key, pdf) where pdf excludes key columns, # so the return schema must also exclude them. ret_type = StructType(schema.fields[n_args - 1 :]) if n_args > 1 else schema n_cols = len(schema.fields) - arg_offsets = _build_grouped_arg_offsets(n_cols, n_keys=n_args - 1) - _write_worker_input( + arg_offsets = MockUDFFactory.make_grouped_arg_offsets(n_args - 1, n_cols - (n_args - 1)) + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, - lambda b: _build_udf_payload(udf_func, ret_type, arg_offsets, b), - lambda b: _write_grouped_arrow_data( - [(batch,)] * num_groups, + lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b), + lambda b: MockProtocolWriter.write_grouped_data_payload( + groups, num_dfs=1, buf=b, - max_records_per_batch=max_rpb, ), buf, ) class GroupedMapPandasUDFTimeBench(_GroupedMapPandasBenchMixin, _TimeBenchBase): - _scenarios = _GROUPED_MAP_PANDAS_SCENARIOS - _udfs = _GROUPED_MAP_PANDAS_UDFS - params = [list(_GROUPED_MAP_PANDAS_SCENARIOS), list(_GROUPED_MAP_PANDAS_UDFS)] - param_names = ["scenario", "udf"] + pass class GroupedMapPandasUDFPeakmemBench(_GroupedMapPandasBenchMixin, _PeakmemBenchBase): - _scenarios = _GROUPED_MAP_PANDAS_SCENARIOS - _udfs = _GROUPED_MAP_PANDAS_UDFS - params = [list(_GROUPED_MAP_PANDAS_SCENARIOS), list(_GROUPED_MAP_PANDAS_UDFS)] - param_names = ["scenario", "udf"] + pass # -- SQL_MAP_ARROW_ITER_UDF ------------------------------------------------ # UDF receives ``Iterator[pa.RecordBatch]``, returns ``Iterator[pa.RecordBatch]``. -# Used by ``mapInArrow``. - - -def _identity_batch_iter(it): - yield from it -def _sort_batch_iter(it): - import pyarrow.compute as pc - - for batch in it: - indices = pc.sort_indices(batch.column(0)) - yield batch.take(indices) - - -def _filter_batch_iter(it): - import pyarrow.compute as pc - - for batch in it: - mask = pc.is_valid(batch.column(0)) - yield batch.filter(mask) - - -_MAP_ARROW_ITER_UDFS = { - "identity_udf": (_identity_batch_iter, None, [0]), - "sort_udf": (_sort_batch_iter, None, [0]), - "filter_udf": (_filter_batch_iter, None, [0]), -} - - -def _build_map_arrow_iter_scenarios(): - """Build scenarios for SQL_MAP_ARROW_ITER_UDF. +class _MapArrowIterBenchMixin: + """Provides ``_write_scenario`` for SQL_MAP_ARROW_ITER_UDF. - Same data shapes as non-grouped scenarios but with reduced batch counts - to account for the struct wrap/unwrap overhead per batch. + Wraps input batches in a struct column to match the JVM-side wire format + (``flatten_struct`` undoes this). """ - scenarios = {} - - for name, (rows, n_cols, num_batches) in { - "sm_batch_few_col": (1_000, 5, 500), - "sm_batch_many_col": (1_000, 50, 50), - "lg_batch_few_col": (10_000, 5, 500), - "lg_batch_many_col": (10_000, 50, 50), - }.items(): - batch, col0_type = _make_typed_batch(rows, n_cols) - scenarios[name] = (batch, num_batches, col0_type) - - _PURE_ROWS, _PURE_COLS, _PURE_BATCHES = 5_000, 10, 200 - - for scenario_name, make_array, spark_type in [ - ( - "pure_ints", - lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), - IntegerType(), - ), - ("pure_floats", lambda r: pa.array(np.random.rand(r)), DoubleType()), - ("pure_strings", lambda r: pa.array([f"s{j}" for j in range(r)]), StringType()), - ( - "pure_ts", - lambda r: pa.array( - np.arange(0, r, dtype="datetime64[us]"), type=pa.timestamp("us", tz=None) - ), - TimestampNTZType(), - ), - ]: - batch = _make_pure_batch(_PURE_ROWS, _PURE_COLS, make_array, spark_type) - scenarios[scenario_name] = (batch, _PURE_BATCHES, spark_type) - - scenarios["mixed_types"] = ( - _make_typed_batch(_PURE_ROWS, _PURE_COLS)[0], - _PURE_BATCHES, - IntegerType(), - ) - - return scenarios - - -_MAP_ARROW_ITER_SCENARIOS = _build_map_arrow_iter_scenarios() - - -def _wrap_batch_in_struct(batch: pa.RecordBatch) -> pa.RecordBatch: - """Wrap all columns into a single struct column, mimicking JVM-side encoding.""" - struct_array = pa.StructArray.from_arrays(batch.columns, names=batch.schema.names) - return pa.RecordBatch.from_arrays([struct_array], names=["_0"]) + def _identity_batch_iter(it): + yield from it + + def _sort_batch_iter(it): + import pyarrow.compute as pc + + for batch in it: + indices = pc.sort_indices(batch.column(0)) + yield batch.take(indices) + + def _filter_batch_iter(it): + import pyarrow.compute as pc + + for batch in it: + mask = pc.is_valid(batch.column(0)) + yield batch.filter(mask) + + def _build_map_arrow_iter_scenarios(): + """Build scenarios for SQL_MAP_ARROW_ITER_UDF. + + Returns a dict mapping scenario name to ``(batches, schema)``. + Same data shapes as non-grouped scenarios but with reduced batch counts + to account for the struct wrap/unwrap overhead per batch. + """ + scenarios = {} + + for name, (num_rows, num_cols, batch_size) in { + "sm_batch_few_col": (500_000, 5, 1_000), + "sm_batch_many_col": (50_000, 50, 1_000), + "lg_batch_few_col": (5_000_000, 5, 10_000), + "lg_batch_many_col": (500_000, 50, 10_000), + }.items(): + struct_type = MockDataFactory.make_struct_type( + num_fields=num_cols, + base_types=MockDataFactory.MIXED_TYPES, + ) + batches, schema = MockDataFactory.make_batches( + num_rows=num_rows, + num_cols=1, + spark_type_pool=[struct_type], + batch_size=batch_size, + ) + scenarios[name] = (batches, schema) + + _NUM_ROWS, _NUM_COLS, _BATCH_SIZE = 1_000_000, 10, 5_000 + + for scenario_name, spark_types in [ + ( + "pure_ints", + [ + ( + lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), + IntegerType(), + ) + ], + ), + ("pure_floats", [(lambda r: pa.array(np.random.rand(r)), DoubleType())]), + ("pure_strings", [(lambda r: pa.array([f"s{j}" for j in range(r)]), StringType())]), + ( + "pure_ts", + [ + ( + lambda r: pa.array( + np.arange(0, r, dtype="datetime64[us]"), + type=pa.timestamp("us", tz=None), + ), + TimestampNTZType(), + ) + ], + ), + ]: + struct_type = MockDataFactory.make_struct_type( + num_fields=_NUM_COLS, + base_types=spark_types, + ) + batches, schema = MockDataFactory.make_batches( + num_rows=_NUM_ROWS, + num_cols=1, + spark_type_pool=[struct_type], + batch_size=_BATCH_SIZE, + ) + scenarios[scenario_name] = (batches, schema) + + struct_type = MockDataFactory.make_struct_type( + num_fields=_NUM_COLS, + base_types=MockDataFactory.MIXED_TYPES, + ) + batches, schema = MockDataFactory.make_batches( + num_rows=_NUM_ROWS, + num_cols=1, + spark_type_pool=[struct_type], + batch_size=_BATCH_SIZE, + ) + scenarios["mixed_types"] = (batches, schema) -class _MapArrowIterBenchMixin: - """Provides ``_write_scenario`` for SQL_MAP_ARROW_ITER_UDF. + return scenarios - Like ``_NonGroupedBenchMixin`` but wraps input batches in a struct column - to match the JVM-side wire format (``flatten_struct`` undoes this). - """ + _scenarios = _build_map_arrow_iter_scenarios() + _udfs = { + "identity_udf": (_identity_batch_iter, None, [0]), + "sort_udf": (_sort_batch_iter, None, [0]), + "filter_udf": (_filter_batch_iter, None, [0]), + } + params = [list(_scenarios), list(_udfs)] + param_names = ["scenario", "udf"] def _write_scenario(self, scenario, udf_name, buf): - batch, num_batches, col0_type = self._scenarios[scenario] + batches, schema = self._scenarios[scenario] udf_func, ret_type, arg_offsets = self._udfs[udf_name] if ret_type is None: - ret_type = col0_type - wrapped = _wrap_batch_in_struct(batch) - _write_worker_input( + ret_type = schema.fields[0].dataType.fields[0].dataType + MockProtocolWriter.write_worker_input( PythonEvalType.SQL_MAP_ARROW_ITER_UDF, - lambda b: _build_udf_payload(udf_func, ret_type, arg_offsets, b), - lambda b: _write_arrow_ipc_batches((wrapped for _ in range(num_batches)), b), + lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b), + lambda b: MockProtocolWriter.write_data_payload(iter(batches), b), buf, ) class MapArrowIterUDFTimeBench(_MapArrowIterBenchMixin, _TimeBenchBase): - _scenarios = _MAP_ARROW_ITER_SCENARIOS - _udfs = _MAP_ARROW_ITER_UDFS - params = [list(_MAP_ARROW_ITER_SCENARIOS), list(_MAP_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] + pass class MapArrowIterUDFPeakmemBench(_MapArrowIterBenchMixin, _PeakmemBenchBase): - _scenarios = _MAP_ARROW_ITER_SCENARIOS - _udfs = _MAP_ARROW_ITER_UDFS - params = [list(_MAP_ARROW_ITER_SCENARIOS), list(_MAP_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] + pass # -- SQL_SCALAR_ARROW_UDF --------------------------------------------------- -# UDF receives individual ``pa.Array`` columns, returns a ``pa.Array``. -# All UDFs operate on arg_offsets=[0] so they work with any column type. - - -def _sort_arrow(c): - import pyarrow.compute as pc - - return pc.take(c, pc.sort_indices(c)) - - -def _nullcheck_arrow(c): - import pyarrow.compute as pc - - return pc.is_valid(c) - - -# ret_type=None means "use col0_type from the scenario" -_SCALAR_ARROW_UDFS = { - "identity_udf": (lambda c: c, None, [0]), - "sort_udf": (_sort_arrow, None, [0]), - "nullcheck_udf": (_nullcheck_arrow, BooleanType(), [0]), -} +# UDF receives ``pa.Array`` columns, returns ``pa.Array``. + + +class _ScalarArrowBenchMixin: + """Mixin for SQL_SCALAR_ARROW_UDF benchmarks.""" + + def _sort_arrow(c): + import pyarrow.compute as pc + + return pc.take(c, pc.sort_indices(c)) + + def _nullcheck_arrow(c): + import pyarrow.compute as pc + + return pc.is_valid(c) + + def _build_scenarios(): + """Build data-shape scenarios for non-grouped Arrow eval types. + + Returns a dict mapping scenario name to ``(batches, schema)``. + """ + scenarios = {} + + for name, (num_rows, num_cols, batch_size) in { + "sm_batch_few_col": (1_500_000, 5, 1_000), + "sm_batch_many_col": (200_000, 50, 1_000), + "lg_batch_few_col": (35_000_000, 5, 10_000), + "lg_batch_many_col": (4_000_000, 50, 10_000), + }.items(): + batches, schema = MockDataFactory.make_batches( + num_rows=num_rows, + num_cols=num_cols, + spark_type_pool=MockDataFactory.MIXED_TYPES, + batch_size=batch_size, + ) + scenarios[name] = (batches, schema) + + _NUM_ROWS, _NUM_COLS, _BATCH_SIZE = 5_000_000, 10, 5_000 + + for scenario_name, spark_types in [ + ( + "pure_ints", + [ + ( + lambda r: pa.array(np.random.randint(0, 1000, r, dtype=np.int64)), + IntegerType(), + ) + ], + ), + ("pure_floats", [(lambda r: pa.array(np.random.rand(r)), DoubleType())]), + ("pure_strings", [(lambda r: pa.array([f"s{j}" for j in range(r)]), StringType())]), + ( + "pure_ts", + [ + ( + lambda r: pa.array( + np.arange(0, r, dtype="datetime64[us]"), + type=pa.timestamp("us", tz=None), + ), + TimestampNTZType(), + ) + ], + ), + ]: + batches, schema = MockDataFactory.make_batches( + num_rows=_NUM_ROWS, + num_cols=_NUM_COLS, + spark_type_pool=spark_types, + batch_size=_BATCH_SIZE, + ) + scenarios[scenario_name] = (batches, schema) + + batches, schema = MockDataFactory.make_batches( + num_rows=_NUM_ROWS, + num_cols=_NUM_COLS, + spark_type_pool=MockDataFactory.MIXED_TYPES, + batch_size=_BATCH_SIZE, + ) + scenarios["mixed_types"] = (batches, schema) + return scenarios -class ScalarArrowUDFTimeBench(_NonGroupedBenchMixin, _TimeBenchBase): _eval_type = PythonEvalType.SQL_SCALAR_ARROW_UDF - _scenarios = _NON_GROUPED_SCENARIOS - _udfs = _SCALAR_ARROW_UDFS - params = [list(_NON_GROUPED_SCENARIOS), list(_SCALAR_ARROW_UDFS)] + _scenarios = _build_scenarios() + # ret_type=None means "use schema.fields[0].dataType from the scenario" + _udfs = { + "identity_udf": (lambda c: c, None, [0]), + "sort_udf": (_sort_arrow, None, [0]), + "nullcheck_udf": (_nullcheck_arrow, BooleanType(), [0]), + } + params = [list(_scenarios), list(_udfs)] param_names = ["scenario", "udf"] - -class ScalarArrowUDFPeakmemBench(_NonGroupedBenchMixin, _PeakmemBenchBase): - _eval_type = PythonEvalType.SQL_SCALAR_ARROW_UDF - _scenarios = _NON_GROUPED_SCENARIOS - _udfs = _SCALAR_ARROW_UDFS - params = [list(_NON_GROUPED_SCENARIOS), list(_SCALAR_ARROW_UDFS)] - param_names = ["scenario", "udf"] + def _write_scenario(self, scenario, udf_name, buf): + batches, schema = self._scenarios[scenario] + udf_func, ret_type, arg_offsets = self._udfs[udf_name] + if ret_type is None: + ret_type = schema.fields[0].dataType + MockProtocolWriter.write_worker_input( + self._eval_type, + lambda b: MockProtocolWriter.write_udf_payload(udf_func, ret_type, arg_offsets, b), + lambda b: MockProtocolWriter.write_data_payload(iter(batches), b), + buf, + ) -# -- SQL_SCALAR_ARROW_ITER_UDF ---------------------------------------------- -# UDF receives ``Iterator[pa.Array]``, returns ``Iterator[pa.Array]``. +class ScalarArrowUDFTimeBench(_ScalarArrowBenchMixin, _TimeBenchBase): + pass -def _identity_iter(it): - return (c for c in it) +class ScalarArrowUDFPeakmemBench(_ScalarArrowBenchMixin, _PeakmemBenchBase): + pass -def _sort_iter(it): - import pyarrow.compute as pc +# -- SQL_SCALAR_ARROW_ITER_UDF ---------------------------------------------- +# UDF receives ``Iterator[pa.Array]``, returns ``Iterator[pa.Array]``. - for c in it: - yield pc.take(c, pc.sort_indices(c)) +class _ScalarArrowIterBenchMixin(_ScalarArrowBenchMixin): + """Mixin for SQL_SCALAR_ARROW_ITER_UDF benchmarks.""" -def _nullcheck_iter(it): - import pyarrow.compute as pc + def _identity_iter(it): + return (c for c in it) - for c in it: - yield pc.is_valid(c) + def _sort_iter(it): + import pyarrow.compute as pc + for c in it: + yield pc.take(c, pc.sort_indices(c)) -_SCALAR_ARROW_ITER_UDFS = { - "identity_udf": (_identity_iter, None, [0]), - "sort_udf": (_sort_iter, None, [0]), - "nullcheck_udf": (_nullcheck_iter, BooleanType(), [0]), -} + def _nullcheck_iter(it): + import pyarrow.compute as pc + for c in it: + yield pc.is_valid(c) -class ScalarArrowIterUDFTimeBench(_NonGroupedBenchMixin, _TimeBenchBase): _eval_type = PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF - _scenarios = _NON_GROUPED_SCENARIOS - _udfs = _SCALAR_ARROW_ITER_UDFS - params = [list(_NON_GROUPED_SCENARIOS), list(_SCALAR_ARROW_ITER_UDFS)] + _udfs = { + "identity_udf": (_identity_iter, None, [0]), + "sort_udf": (_sort_iter, None, [0]), + "nullcheck_udf": (_nullcheck_iter, BooleanType(), [0]), + } + params = [list(_ScalarArrowBenchMixin._scenarios), list(_udfs)] param_names = ["scenario", "udf"] -class ScalarArrowIterUDFPeakmemBench(_NonGroupedBenchMixin, _PeakmemBenchBase): - _eval_type = PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF - _scenarios = _NON_GROUPED_SCENARIOS - _udfs = _SCALAR_ARROW_ITER_UDFS - params = [list(_NON_GROUPED_SCENARIOS), list(_SCALAR_ARROW_ITER_UDFS)] - param_names = ["scenario", "udf"] +class ScalarArrowIterUDFTimeBench(_ScalarArrowIterBenchMixin, _TimeBenchBase): + pass + + +class ScalarArrowIterUDFPeakmemBench(_ScalarArrowIterBenchMixin, _PeakmemBenchBase): + pass