Skip to content

Commit df2fe36

Browse files
committed
UNPICK changes to review
1 parent 50fe1c1 commit df2fe36

File tree

12 files changed

+62
-667
lines changed

12 files changed

+62
-667
lines changed

docs/source/conf.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,6 @@
7272
suppress_warnings = ["autoapi.python_import_resolution"]
7373
autoapi_python_class_content = "both"
7474
autoapi_keep_files = False # set to True for debugging generated files
75-
autoapi_options = [
76-
"members",
77-
"undoc-members",
78-
"special-members",
79-
"show-inheritance",
80-
]
8175

8276

8377
def autoapi_skip_member_fn(app, what, name, obj, skip, options) -> bool: # noqa: ARG001

docs/source/user-guide/dataframe/index.rst

Lines changed: 1 addition & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -145,66 +145,10 @@ To materialize the results of your DataFrame operations:
145145
146146
# Display results
147147
df.show() # Print tabular format to console
148-
148+
149149
# Count rows
150150
count = df.count()
151151
152-
PyArrow Streaming
153-
-----------------
154-
155-
DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
156-
zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
157-
Earlier versions eagerly converted the entire DataFrame when exporting to
158-
PyArrow, which could exhaust memory on large datasets. With streaming, batches
159-
are produced lazily so you can process arbitrarily large results without
160-
out-of-memory errors.
161-
162-
.. code-block:: python
163-
164-
import pyarrow as pa
165-
166-
# Create a PyArrow RecordBatchReader without materializing all batches
167-
reader = pa.RecordBatchReader.from_stream(df)
168-
for batch in reader:
169-
... # process each batch as it is produced
170-
171-
DataFrames are also iterable, yielding :class:`datafusion.RecordBatch`
172-
objects lazily so you can loop over results directly without importing
173-
PyArrow:
174-
175-
.. code-block:: python
176-
177-
for batch in df:
178-
... # each batch is a ``datafusion.RecordBatch``
179-
180-
Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow
181-
table without collecting everything eagerly:
182-
183-
.. code-block:: python
184-
185-
import pyarrow as pa
186-
table = pa.Table.from_batches(b.to_pyarrow() for b in df)
187-
188-
Asynchronous iteration is supported as well, allowing integration with
189-
``asyncio`` event loops:
190-
191-
.. code-block:: python
192-
193-
async for batch in df:
194-
... # process each batch as it is produced
195-
196-
To work with the stream directly, use
197-
``to_record_batch_stream()``, which returns a
198-
:class:`~datafusion.RecordBatchStream`:
199-
200-
.. code-block:: python
201-
202-
stream = df.to_record_batch_stream()
203-
for batch in stream:
204-
...
205-
206-
See :doc:`../io/arrow` for additional details on the Arrow interface.
207-
208152
HTML Rendering
209153
--------------
210154

python/datafusion/dataframe.py

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@
2525
from typing import (
2626
TYPE_CHECKING,
2727
Any,
28-
AsyncIterator,
2928
Iterable,
30-
Iterator,
3129
Literal,
3230
Optional,
3331
Union,
@@ -44,7 +42,7 @@
4442
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
4543
from datafusion.expr import Expr, SortExpr, sort_or_default
4644
from datafusion.plan import ExecutionPlan, LogicalPlan
47-
from datafusion.record_batch import RecordBatch, RecordBatchStream
45+
from datafusion.record_batch import RecordBatchStream
4846

4947
if TYPE_CHECKING:
5048
import pathlib
@@ -291,9 +289,6 @@ def __init__(
291289
class DataFrame:
292290
"""Two dimensional table representation of data.
293291
294-
DataFrame objects are iterable; iterating over a DataFrame yields
295-
:class:`pyarrow.RecordBatch` instances lazily.
296-
297292
See :ref:`user_guide_concepts` in the online documentation for more information.
298293
"""
299294

@@ -310,7 +305,7 @@ def into_view(self) -> pa.Table:
310305
return self.df.into_view()
311306

312307
def __getitem__(self, key: str | list[str]) -> DataFrame:
313-
"""Return a new :py:class:`DataFrame` with the specified column or columns.
308+
"""Return a new :py:class`DataFrame` with the specified column or columns.
314309
315310
Args:
316311
key: Column name or list of column names to select.
@@ -1031,10 +1026,6 @@ def execute_stream(self) -> RecordBatchStream:
10311026
"""
10321027
return RecordBatchStream(self.df.execute_stream())
10331028

1034-
def to_record_batch_stream(self) -> RecordBatchStream:
1035-
"""Return a :class:`RecordBatchStream` executing this DataFrame."""
1036-
return self.execute_stream()
1037-
10381029
def execute_stream_partitioned(self) -> list[RecordBatchStream]:
10391030
"""Executes this DataFrame and returns a stream for each partition.
10401031
@@ -1044,15 +1035,6 @@ def execute_stream_partitioned(self) -> list[RecordBatchStream]:
10441035
streams = self.df.execute_stream_partitioned()
10451036
return [RecordBatchStream(rbs) for rbs in streams]
10461037

1047-
def to_record_batch_stream(self) -> RecordBatchStream:
1048-
"""Return a :py:class:`RecordBatchStream` over this DataFrame's results.
1049-
1050-
Returns:
1051-
A ``RecordBatchStream`` representing the lazily generated record
1052-
batches for this DataFrame.
1053-
"""
1054-
return self.execute_stream()
1055-
10561038
def to_pandas(self) -> pd.DataFrame:
10571039
"""Execute the :py:class:`DataFrame` and convert it into a Pandas DataFrame.
10581040
@@ -1116,33 +1098,21 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
11161098
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))
11171099

11181100
def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
1119-
"""Export the DataFrame as an Arrow C Stream.
1101+
"""Export an Arrow PyCapsule Stream.
11201102
1121-
The DataFrame is executed using DataFusion's streaming APIs and exposed via
1122-
Arrow's C Stream interface. Record batches are produced incrementally, so the
1123-
full result set is never materialized in memory. When ``requested_schema`` is
1124-
provided, only straightforward projections such as column selection or
1125-
reordering are applied.
1103+
This will execute and collect the DataFrame. We will attempt to respect the
1104+
requested schema, but only trivial transformations will be applied such as only
1105+
returning the fields listed in the requested schema if their data types match
1106+
those in the DataFrame.
11261107
11271108
Args:
11281109
requested_schema: Attempt to provide the DataFrame using this schema.
11291110
11301111
Returns:
1131-
Arrow PyCapsule object representing an ``ArrowArrayStream``.
1112+
Arrow PyCapsule object.
11321113
"""
1133-
# ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
1134-
# ``execute_stream_partitioned`` under the hood to stream batches while
1135-
# preserving the original partition order.
11361114
return self.df.__arrow_c_stream__(requested_schema)
11371115

1138-
def __iter__(self) -> Iterator[RecordBatch]:
1139-
"""Return an iterator over this DataFrame's record batches."""
1140-
return iter(self.to_record_batch_stream())
1141-
1142-
def __aiter__(self) -> AsyncIterator[RecordBatch]:
1143-
"""Return an async iterator over this DataFrame's record batches."""
1144-
return self.to_record_batch_stream().__aiter__()
1145-
11461116
def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
11471117
"""Apply a function to the current DataFrame which returns another DataFrame.
11481118

python/datafusion/record_batch.py

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -46,26 +46,6 @@ def to_pyarrow(self) -> pa.RecordBatch:
4646
"""Convert to :py:class:`pa.RecordBatch`."""
4747
return self.record_batch.to_pyarrow()
4848

49-
def __arrow_c_array__(
50-
self, requested_schema: object | None = None
51-
) -> tuple[object, object]:
52-
"""Export the record batch via the Arrow C Data Interface.
53-
54-
This allows zero-copy interchange with libraries that support the
55-
`Arrow PyCapsule interface <https://arrow.apache.org/docs/format/
56-
CDataInterface/PyCapsuleInterface.html>`_.
57-
58-
Args:
59-
requested_schema: Attempt to provide the record batch using this
60-
schema. Only straightforward projections such as column
61-
selection or reordering are applied.
62-
63-
Returns:
64-
Two Arrow PyCapsule objects representing the ``ArrowArray`` and
65-
``ArrowSchema``.
66-
"""
67-
return self.record_batch.__arrow_c_array__(requested_schema)
68-
6949

7050
class RecordBatchStream:
7151
"""This class represents a stream of record batches.
@@ -83,19 +63,19 @@ def next(self) -> RecordBatch:
8363
return next(self)
8464

8565
async def __anext__(self) -> RecordBatch:
86-
"""Return the next :py:class:`RecordBatch` in the stream asynchronously."""
66+
"""Async iterator function."""
8767
next_batch = await self.rbs.__anext__()
8868
return RecordBatch(next_batch)
8969

9070
def __next__(self) -> RecordBatch:
91-
"""Return the next :py:class:`RecordBatch` in the stream."""
71+
"""Iterator function."""
9272
next_batch = next(self.rbs)
9373
return RecordBatch(next_batch)
9474

9575
def __aiter__(self) -> typing_extensions.Self:
96-
"""Return an asynchronous iterator over record batches."""
76+
"""Async iterator function."""
9777
return self
9878

9979
def __iter__(self) -> typing_extensions.Self:
100-
"""Return an iterator over record batches."""
80+
"""Iterator function."""
10181
return self

python/tests/conftest.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import pyarrow as pa
1919
import pytest
20-
from datafusion import DataFrame, SessionContext
20+
from datafusion import SessionContext
2121
from pyarrow.csv import write_csv
2222

2323

@@ -49,12 +49,3 @@ def database(ctx, tmp_path):
4949
delimiter=",",
5050
schema_infer_max_records=10,
5151
)
52-
53-
54-
@pytest.fixture
55-
def fail_collect(monkeypatch):
56-
def _fail_collect(self, *args, **kwargs): # pragma: no cover - failure path
57-
msg = "collect should not be called"
58-
raise AssertionError(msg)
59-
60-
monkeypatch.setattr(DataFrame, "collect", _fail_collect)

0 commit comments

Comments
 (0)