Skip to content

[Data] Propagate Schema in _shuffle_block Empty Block Case to ColumnNotFound Error in Chained Left Joins#61507

Open
rayhhome wants to merge 6 commits intoray-project:masterfrom
rayhhome:join-empty
Open

[Data] Propagate Schema in _shuffle_block Empty Block Case to ColumnNotFound Error in Chained Left Joins#61507
rayhhome wants to merge 6 commits intoray-project:masterfrom
rayhhome:join-empty

Conversation

@rayhhome
Copy link
Contributor

@rayhhome rayhhome commented Mar 5, 2026

Description

Per #60013, chained left joins fail with ColumnNotFoundError when the first join produces empty intermediate blocks. #60520 attempted the fix, but a refined reproduction script shows it does not resolve the underlying issue. This PR proposes a targeted fix and a deterministic regression test.

Root cause

Using the example in #60013, when the streaming executor feeds the second join's input, the first block delivered can have zero rows. The bug is then triggered through the following sequence:

  1. _do_add_input_inner sees that this is the first block for input sequence 0 (or 1), so it submits a _shuffle_block task with send_empty_blocks=True and immediately sets self._has_schemas_broadcasted[input_index] = True;
  2. Remote _shuffle_block worker tasks triggers an early return of (empty_metadata, {}). No aggregator.submit() calls are ever made and the schema never reaches any aggregator;
  3. All subsequent blocks are submitted with send_empty_blocks=False. Qggregators with no non-empty data are never contacted at all, leaving their bucket queue empty;
  4. At finalization, drain_queue() returns [] for those partitions, so _combine([]) builds an ArrowblockBuilder with no add_block() calls and produces an empty table with no columns;
  5. When JoiningAggregation.finalize() calls pa.Table.join() on this columnless table, it raises ColumnNotFoundError as observed.

Why #60520 does not fix this issue

#60520 modifies ArrowBlockBuilder.build() to use a stored self._schema when len(tables) == 0. However, self._schema is only populated insideadd_block() calls. When partition_shards is [] in _combine(...), self._schema remains None.

This fix

In _shuffle_block, when block.num_rows == 0 and send_empty_blocks=True, explicitly broadcast schema-carrying empty tables to every aggregator before returning. This mirrors the broadcast logic for non-empty blocks, which ensures every aggregator holds at least one schema-carrying block and thus finalizes correctly.

Related issues

Fixes #60013 and follows up on #60520.

Additional information

The original reproduction script in #60013 occasionally misses the error due to the uncertain order of blocks fed to the second join. To force the bug, add the following lines to the reproduction script:

    ...
    shapes = [b.shape for b in blocks]
    print(f"Columns flattened via map_batches: {flatten_columns}")
    print("Block shapes after first join:", shapes)

    # ----- Add the following lines -----
    # Force the bug
    # The streaming executor delivers blocks in completion order, so non-empty
    # partitions finish faster and arrive first, letting schema broadcast succeed
    # silently.  Reconstructing the dataset with empty blocks at the front
    # guarantees that _shuffle_block() sees a zero-row block as the very first
    # block for the left input sequence, triggering the premature
    # _has_schemas_broadcasted flag and the resulting (0,0) empty-table bug.
    import pyarrow as pa
    empty   = [b for b in blocks if b.num_rows == 0]
    nonempty = [b for b in blocks if b.num_rows > 0]
    assert empty, "No empty blocks found — cannot reproduce the bug with this dataset."
    print(f"Reordering: {len(empty)} empty blocks first, then {len(nonempty)} non-empty.")
    ds_joined = ray.data.from_arrow(empty + nonempty)
    print("Block shapes after reordering:", [b.shape for b in (ray.get(ref) for ref in ds_joined.get_internal_block_refs())])
    # ----------------------------------

    # Create mapping table
    # Use some of the location_ids for the mapping
    shared_location_ids = location_ids[: max(1, len(location_ids) // 3)]
    ...

The augmented script forces the order of blocks so that the first block going into the second join is always empty.

The new test case in test_join.py places the empty block in a list fed to from_arrow, preserving the block order and ensuring that the second join will always see the empty block first. The bug fires reliably on every run before the fix.

rayhhome added 2 commits March 4, 2026 15:36
Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
@rayhhome rayhhome self-assigned this Mar 5, 2026
Copilot AI review requested due to automatic review settings March 5, 2026 00:49
@rayhhome rayhhome requested a review from a team as a code owner March 5, 2026 00:49
@rayhhome rayhhome added the data Ray Data-related issues label Mar 5, 2026
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a Ray Data hash-shuffle join edge case where a first empty-row block prevents schema broadcast to aggregators, producing columnless empty tables that later trigger ColumnNotFoundError in chained left joins.

Changes:

  • Update _shuffle_block() to broadcast schema-carrying empty tables to all aggregators when the first received block is empty and send_empty_blocks=True.
  • Add a deterministic regression test that forces the “empty block arrives first” ordering to reproduce the chained left-join failure.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
python/ray/data/_internal/execution/operators/hash_shuffle.py Ensures schema propagation even when the first shuffled block has num_rows == 0 and schema broadcast is required.
python/ray/data/tests/test_join.py Adds a regression test that deterministically reproduces and validates the fix for chained left-outer joins with empty intermediate blocks.
Comments suppressed due to low confidence (1)

python/ray/data/_internal/execution/operators/hash_shuffle.py:295

  • In the block.num_rows == 0 and send_empty_blocks branch, this broadcasts by calling _create_empty_table(block.schema) and ray.put(...) once per partition. For large num_partitions this creates many identical objects in the object store and duplicates logic that already exists in the non-empty path. Consider reusing the existing partition loop by only early-returning when send_empty_blocks is false, or at least ray.put() a single schema-carrying empty table once and reuse the same ObjectRef for all aggregator.submit calls.
    if block.num_rows == 0:
        if send_empty_blocks:
            # NOTE: Perform the schema broadcast explicitly
            #       Every aggregator gets a (0, N) schema-carrying placeholder even when
            #       the triggering block itself is empty.
            pending = []
            for partition_id in range(pool.num_partitions):
                aggregator = pool.get_aggregator_for_partition(partition_id)
                partition_ref = ray.put(_create_empty_table(block.schema))
                pending.append(
                    aggregator.submit.remote(input_index, partition_id, partition_ref)
                )
            # Wait for all schema-carrier blocks to be accepted before returning.
            # This mirrors the synchronization in the normal (non-empty) path and
            # ensures aggregators hold the schema before finalize() is called.
            while pending:
                _, pending = ray.wait(pending, num_returns=len(pending), timeout=1)


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request provides a well-reasoned fix for a ColumnNotFoundError that occurs during chained left joins with empty intermediate blocks. The approach of explicitly broadcasting a schema-carrying empty block to all aggregators is a solid solution to ensure schema propagation. The included regression test is excellent, as it deterministically reproduces the issue and verifies the fix. I have one suggestion to simplify the waiting logic by using ray.get() instead of a while loop with ray.wait(), which would make the code more concise and robust in handling task failures. Overall, this is a high-quality contribution.

Note: Security Review did not run due to the size of the PR.

Signed-off-by: Sirui Huang <ray.huang@anyscale.com>
Copy link
Contributor

@iamjustinhsu iamjustinhsu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

]

# The first block must be the empty one so the bug fires.
joined_1 = ray.data.from_arrow([empty_block] + data_blocks)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure this is true unless ctx.execution_options.preserve_order = True

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I traced through from_arrow and I think the preserve_order variable is not involved in its operations. I see it only referenced in MapOperator, UnionOperator, and OutputSplitter, but not in InputDataBuffer, which I believe is the only operator involved here.

), f"Missing columns. Got: {result.columns.tolist()}"

# Verify per-row correctness
for _, row in result.iterrows():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bit hard to read how u are verifying output. What if u did something like

expected_table = pd.dataframe({...})
result = expected_table

Idk the syntax off the top of my head, but that way it's easier to read the expected output

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data] Dataset Left-Outer join produces empty blocks, causing errors in downstream joins

3 participants