Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def _shuffle_block(
block, block_type=BlockType.ARROW
)

if block.num_rows == 0:
if block.num_rows == 0 and not send_empty_blocks:
empty = BlockAccessor.for_block(block).get_metadata(
block_exec_stats=stats.build(block_ser_time_s=0),
)
Expand Down Expand Up @@ -354,7 +354,7 @@ def _shuffle_block(
block_exec_stats=stats.build(block_ser_time_s=0)
)

if logger.isEnabledFor(logging.DEBUG):
if logger.isEnabledFor(logging.DEBUG) and partition_shards_stats:
num_rows_series, byte_sizes_series = zip(
*[(s.num_rows, s.byte_size) for s in partition_shards_stats.values()]
)
Expand Down
93 changes: 90 additions & 3 deletions python/ray/data/tests/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,9 +727,6 @@ def test_join_with_predicate_pushdown(
else:
filtered_ds = joined.filter(expr=col("right_val") < 1000)

# Verify correctness by computing expected result with pandas
from ray.data._internal.util import rows_same

left_pd = left_ds.to_pandas()
right_pd = right_ds.to_pandas()

Expand Down Expand Up @@ -850,6 +847,96 @@ def test_join_cross_side_column_comparison_no_pushdown(ray_start_regular_shared_
)


def test_chained_left_outer_join_with_empty_blocks(ray_start_regular_shared_2_cpus):
"""Regression test for https://github.com/ray-project/ray/issues/60013.

The bug
-------
When a hash-shuffle join receives an **empty-row** block as the very first
block for an input sequence, _shuffle_block() returns early (num_rows == 0)
without sending any data to any aggregator. The caller, however, marks
_has_schemas_broadcasted[input_index] = True immediately after submitting
the task. Every subsequent block for that sequence uses
send_empty_blocks=False. Aggregators that receive no non-empty rows from
those subsequent blocks end up with an empty queue. When finalize() is
called, _combine([]) builds an ArrowBlockBuilder with zero blocks and
returns a (0 rows, 0 columns) table. The downstream join then fails with
ColumnNotFoundError because the join key column is absent.

We bypass the first join entirely and use ray.data.from_blocks() to build a
dataset whose very first block is an explicit zero-row Arrow table that
carries the full column schema. With num_partitions=20 and only 10 data
rows the second join has at least 10 aggregator partitions that receive no
non-empty data. Before the fix those partitions produce (0, 0) tables and
the join raises ColumnNotFoundError. After the fix schema-carrier blocks
are broadcast even for the empty first block, so every aggregator can
finalize correctly.
"""
import pyarrow as pa

# Build a dataset that simulates the output of a first left-outer join:
# - block 0: explicitly empty (0 rows) but carries the full schema
# - blocks 1-10: one row each, with b_val populated for id >= 5
#
# from_blocks() preserves block order and count exactly, so the empty block
# is guaranteed to be the first block the second join's shuffle sees.
schema = pa.schema(
[
pa.field("id", pa.int64()),
pa.field("a_val", pa.string()),
pa.field("b_val", pa.string()),
]
)
empty_block = schema.empty_table() # shape (0, 3), schema but no rows

data_blocks = [
pa.table(
{
"id": pa.array([i], type=pa.int64()),
"a_val": pa.array([f"a_{i}"], type=pa.string()),
"b_val": pa.array([f"b_{i}" if i >= 5 else None], type=pa.string()),
}
)
for i in range(10)
]

# The first block must be the empty one so the bug fires.
# from_blocks guarantees block order and count are preserved as-is.
joined_1 = ray.data.from_blocks([empty_block] + data_blocks)

# Second dataset for the chained join
ds_c = ray.data.from_arrow(
pa.table(
{
"id": pa.array(range(10), type=pa.int64()),
"c_val": pa.array([f"c_{i}" for i in range(10)], type=pa.string()),
}
)
)

# num_partitions=20 with only 10 data rows means at least 10 aggregator
# partitions receive no non-empty left-side data.
joined_2 = joined_1.join(
ds_c,
join_type="left_outer",
on=("id",),
num_partitions=20,
)

result = joined_2.to_pandas()

expected = pd.DataFrame(
{
"id": list(range(10)),
"a_val": [f"a_{i}" for i in range(10)],
"b_val": [f"b_{i}" if i >= 5 else None for i in range(10)],
"c_val": [f"c_{i}" for i in range(10)],
}
)

assert rows_same(result, expected)


if __name__ == "__main__":
import sys

Expand Down
Loading