Skip to content

perf: replace CometBatchIterator FFI input path with the Arrow C Stream Interface#4572

Draft
mbutrovich wants to merge 65 commits into
apache:mainfrom
mbutrovich:arrow-stream-reader
Draft

perf: replace CometBatchIterator FFI input path with the Arrow C Stream Interface#4572
mbutrovich wants to merge 65 commits into
apache:mainfrom
mbutrovich:arrow-stream-reader

Conversation

@mbutrovich

@mbutrovich mbutrovich commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #3770.

Note: peeled off the draft experimental PR #4393 (which is not intended to merge), and for now also carries the commits from #4507 (native shuffle optimizations), so the diff is temporarily inflated and will shrink once #4507 merges. The description below covers only this PR's own scope (the Arrow C Stream Interface input path), not the #4507 native shuffle work.

Rationale for this change

  • The JVM-to-native input path used a bespoke CometBatchIterator plus a per-batch FFI deep copy, guarded by an arrow_ffi_safe flag, because the JVM could reuse or mutate a batch's buffers after handing it off. Every batch crossing the boundary was copied.
  • The Arrow C Stream Interface is the canonical, zero-copy way to hand an Arrow stream across FFI with proper ownership transfer, so both the deep copy and the flag become unnecessary.

What changes are included in this PR?

  • JVM exports each per-partition Iterator[ColumnarBatch] as an org.apache.arrow.c.ArrowArrayStream (Data.exportArrayStream); native takes ownership via from_raw. CometBatchIterator.java and the arrow_ffi_safe proto field/plumbing are removed.
  • CometExecIterator / CometExecRDD now pass an Array[Object] of already-exported ArrowArrayStream (or CometShuffleBlockIterator) slots instead of CometBatchIterator.
  • New ArrowReader implementations bridging Spark data to Arrow: RowArrowReader (InternalRow), SparkColumnarArrowReader (non-Arrow Spark ColumnarBatch), ColumnarBatchArrowReader (Arrow-backed ColumnarBatch, with VSR ownership transfer).
  • New CometNativeArrowSource trait: an operator supplies one per-partition reader and gets both the JVM columnar path (doExecuteColumnar) and the native C Stream path (doExecuteAsArrowStream). Implemented by CometLocalTableScanExec and CometSparkToColumnarExec.
  • Native AlignedArrowStreamReader wraps arrow-rs's stream reader to align buffers per imported batch (the JVM exports 8-byte-aligned buffers, which trip arrow-rs's alignment assertion). This is a temporary workaround: upstream Call align_buffers() in from_ffi, remove redundant call from arrow-pyarrow arrow-rs#10030 fixes it and ships in arrow 59.0.0, after which this reader can be dropped. scan.rs drops the per-batch deep copy.
  • reconcileStreamSchema advertises the truthful first-batch Arrow schema (not the consumer's declared types) so native ScanExec's boundary cast fires; logs one deduped warning per type drift (e.g. width_bucket return-type drift).
  • Unrelated to the Arrow C Stream work but too entangled to peel off cleanly from feat: enable CometLocalTableScanExec by default #4393: CometLocalTableScanExec now mixes in DataTypeSupport and runs a schema-level fallback in convert, so a LocalTableScanExec whose schema carries a type with no ArrowWriter coverage (Spark 4.1 TimeType, intervals, etc.) falls back to Spark instead of failing at the boundary. NullType is allow-listed since ArrowWriter handles it.

How are these changes tested?

  • Existing suites exercise the input path end to end (CometExecSuite, CometShuffleSuite, ParquetReadSuite, the fuzz suites).
  • New CometArrowStreamSuite covering stream export and schema reconciliation, added to the Linux and macOS PR build workflows.
  • New CometExecSuite cases for CometLocalTableScanExec: a TimeType schema-level fallback check (Spark 4.1+), plus two Arrow-buffer leak checks (project consumer and collect_list) that fail via the allocator leak detector if the per-batch buffers leak.

mbutrovich and others added 30 commits May 21, 2026 16:44
…nonical Arrow C Stream Interface (JVM Data.exportArrayStream <-> native ArrowArrayStreamReader), eliminating the per-batch FFI deep copy and the arrow_ffi_safe flag.
@mbutrovich mbutrovich added this to the 0.17.0 milestone Jun 2, 2026
@mbutrovich mbutrovich self-assigned this Jun 2, 2026
@mbutrovich

mbutrovich commented Jun 5, 2026

Copy link
Copy Markdown
Contributor Author

Ran TPC-H SF1000 with Comet on Spark 3.5.8:

tpch_queries_compare tpch_allqueries
Query main (s) #4507 (s) #4572 (s)
TPCH-01 10.495 10.151 10.722
TPCH-02 23.455 22.177 21.629
TPCH-03 12.366 14.064 10.347
TPCH-04 14.067 13.360 11.927
TPCH-05 30.023 29.119 27.642
TPCH-06 1.128 0.710 0.942
TPCH-07 16.368 15.073 13.730
TPCH-08 36.566 35.680 33.435
TPCH-09 44.722 41.845 40.223
TPCH-10 29.848 29.106 26.074
TPCH-11 15.373 14.837 15.752
TPCH-12 7.919 7.486 6.638
TPCH-13 10.194 10.103 9.761
TPCH-14 2.631 2.672 2.389
TPCH-15 11.613 12.524 12.202
TPCH-16 11.207 9.778 8.745
TPCH-17 31.002 28.755 29.156
TPCH-18 62.856 56.601 56.502
TPCH-19 9.253 9.319 8.556
TPCH-20 9.015 8.610 7.722
TPCH-21 74.530 69.337 71.159
TPCH-22 10.640 9.574 9.813
Total 475.272 450.879 435.066

Looks like it further improves on #4507.

mbutrovich and others added 4 commits June 5, 2026 09:32
# Conflicts:
#	spark/src/main/scala/org/apache/spark/sql/comet/CometExecRDD.scala
#	spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleInputRDD.scala
#	spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala
#	spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala
#	spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Remove mutable buffer use from CometArrowConverters

1 participant