Skip to content

Commit b903833

Browse files
feat: Add __dataframe__ interchange support
1 parent 8804ada commit b903833

File tree

5 files changed

+335
-143
lines changed

5 files changed

+335
-143
lines changed

bigframes/core/blocks.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
Optional,
3939
Sequence,
4040
Tuple,
41+
TYPE_CHECKING,
4142
Union,
4243
)
4344
import warnings
@@ -73,6 +74,9 @@
7374
from bigframes.session import dry_runs, execution_spec
7475
from bigframes.session import executor as executors
7576

77+
if TYPE_CHECKING:
78+
from bigframes.session.executor import ExecuteResult
79+
7680
# Type constraint for wherever column labels are used
7781
Label = typing.Hashable
7882

@@ -408,13 +412,15 @@ def reset_index(
408412
col_level: Union[str, int] = 0,
409413
col_fill: typing.Hashable = "",
410414
allow_duplicates: bool = False,
415+
replacement: Optional[bigframes.enums.DefaultIndexKind] = None,
411416
) -> Block:
412417
"""Reset the index of the block, promoting the old index to a value column.
413418
414419
Arguments:
415420
level: the label or index level of the index levels to remove.
416421
name: this is the column id for the new value id derived from the old index
417-
allow_duplicates:
422+
allow_duplicates: if false, duplicate col labels will result in error
423+
replacement: if not null, will override default index replacement type
418424
419425
Returns:
420426
A new Block because dropping index columns can break references
@@ -429,23 +435,19 @@ def reset_index(
429435
level_ids = self.index_columns
430436

431437
expr = self._expr
438+
replacement_idx_type = replacement or self.session._default_index_type
432439
if set(self.index_columns) > set(level_ids):
433440
new_index_cols = [col for col in self.index_columns if col not in level_ids]
434441
new_index_labels = [self.col_id_to_index_name[id] for id in new_index_cols]
435-
elif (
436-
self.session._default_index_type
437-
== bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64
438-
):
442+
elif replacement_idx_type == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64:
439443
expr, new_index_col_id = expr.promote_offsets()
440444
new_index_cols = [new_index_col_id]
441445
new_index_labels = [None]
442-
elif self.session._default_index_type == bigframes.enums.DefaultIndexKind.NULL:
446+
elif replacement_idx_type == bigframes.enums.DefaultIndexKind.NULL:
443447
new_index_cols = []
444448
new_index_labels = []
445449
else:
446-
raise ValueError(
447-
f"Unrecognized default index kind: {self.session._default_index_type}"
448-
)
450+
raise ValueError(f"Unrecognized default index kind: {replacement_idx_type}")
449451

450452
if drop:
451453
# Even though the index might be part of the ordering, keep that
@@ -634,15 +636,17 @@ def to_pandas(
634636
max_download_size, sampling_method, random_state
635637
)
636638

637-
df, query_job = self._materialize_local(
639+
ex_result = self._materialize_local(
638640
materialize_options=MaterializationOptions(
639641
downsampling=sampling,
640642
allow_large_results=allow_large_results,
641643
ordered=ordered,
642644
)
643645
)
646+
df = ex_result.to_pandas()
647+
df = self._copy_index_to_pandas(df)
644648
df.set_axis(self.column_labels, axis=1, copy=False)
645-
return df, query_job
649+
return df, ex_result.query_job
646650

647651
def _get_sampling_option(
648652
self,
@@ -750,7 +754,7 @@ def _copy_index_to_pandas(self, df: pd.DataFrame) -> pd.DataFrame:
750754

751755
def _materialize_local(
752756
self, materialize_options: MaterializationOptions = MaterializationOptions()
753-
) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]:
757+
) -> ExecuteResult:
754758
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
755759
# TODO(swast): Allow for dry run and timeout.
756760
under_10gb = (
@@ -819,8 +823,7 @@ def _materialize_local(
819823
MaterializationOptions(ordered=materialize_options.ordered)
820824
)
821825
else:
822-
df = execute_result.to_pandas()
823-
return self._copy_index_to_pandas(df), execute_result.query_job
826+
return execute_result
824827

825828
def _downsample(
826829
self, total_rows: int, sampling_method: str, fraction: float, random_state

bigframes/core/interchange.py

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
import dataclasses
17+
import functools
18+
from typing import Any, Dict, Iterable, Optional, Sequence, TYPE_CHECKING
19+
20+
from bigframes.core import blocks
21+
import bigframes.enums
22+
23+
if TYPE_CHECKING:
24+
import bigframes.dataframe
25+
26+
27+
@dataclasses.dataclass(frozen=True)
28+
class InterchangeColumn:
29+
_dataframe: InterchangeDataFrame
30+
_pos: int
31+
32+
@functools.cache
33+
def _arrow_column(self):
34+
# Conservatively downloads the whole underlying dataframe
35+
# This is much better if multiple columns end up being used,
36+
# but does incur a lot of overhead otherwise.
37+
return self._dataframe._arrow_dataframe().get_column(self._pos)
38+
39+
def size(self) -> int:
40+
return self._arrow_column().size()
41+
42+
@property
43+
def offset(self) -> int:
44+
return self._arrow_column().offset
45+
46+
@property
47+
def dtype(self):
48+
return self._arrow_column().dtype
49+
50+
@property
51+
def describe_categorical(self):
52+
raise TypeError(f"Column type {self.dtype} is not categorical")
53+
54+
@property
55+
def describe_null(self):
56+
return self._arrow_column().describe_null
57+
58+
@property
59+
def null_count(self):
60+
return self._arrow_column().null_count
61+
62+
@property
63+
def metadata(self) -> Dict[str, Any]:
64+
return self._arrow_column().metadata
65+
66+
def num_chunks(self) -> int:
67+
return self._arrow_column().num_chunks()
68+
69+
def get_chunks(self, n_chunks: Optional[int] = None) -> Iterable:
70+
return self._arrow_column().get_chunks(n_chunks=n_chunks)
71+
72+
def get_buffers(self):
73+
return self._arrow_column().get_buffers()
74+
75+
76+
@dataclasses.dataclass(frozen=True)
77+
class InterchangeDataFrame:
78+
"""
79+
Implements the dataframe interchange format.
80+
81+
Mostly implemented by downloading result to pyarrow, and using pyarrow interchange implementation.
82+
"""
83+
84+
_value: blocks.Block
85+
86+
version: int = 0 # version of the protocol
87+
88+
def __dataframe__(
89+
self, nan_as_null: bool = False, allow_copy: bool = True
90+
) -> InterchangeDataFrame:
91+
return self
92+
93+
@classmethod
94+
def _from_bigframes(cls, df: bigframes.dataframe.DataFrame):
95+
block = df._block.with_column_labels(
96+
[str(label) for label in df._block.column_labels]
97+
)
98+
return cls(block)
99+
100+
# In future, could potentially rely on executor to refetch batches efficiently with caching,
101+
# but safest for now to just request a single execution and save the whole table.
102+
@functools.cache
103+
def _arrow_dataframe(self):
104+
arrow_table, _ = self._value.reset_index(
105+
replacement=bigframes.enums.DefaultIndexKind.NULL
106+
).to_arrow(allow_large_results=False)
107+
return arrow_table.__dataframe__()
108+
109+
@property
110+
def metadata(self):
111+
# Allows round-trip without materialization
112+
return {"bigframes.block": self._value}
113+
114+
def num_columns(self) -> int:
115+
"""
116+
Return the number of columns in the DataFrame.
117+
"""
118+
return len(self._value.value_columns)
119+
120+
def num_rows(self) -> Optional[int]:
121+
return self._value.shape[0]
122+
123+
def num_chunks(self) -> int:
124+
return self._arrow_dataframe().num_chunks()
125+
126+
def column_names(self) -> Iterable[str]:
127+
return [col for col in self._value.column_labels]
128+
129+
def get_column(self, i: int) -> InterchangeColumn:
130+
return InterchangeColumn(self, i)
131+
132+
# For single column getters, we download the whole dataframe still
133+
# This is inefficient in some cases, but more efficient in other
134+
def get_column_by_name(self, name: str) -> InterchangeColumn:
135+
col_id = self._value.resolve_label_exact(name)
136+
assert col_id is not None
137+
pos = self._value.value_columns.index(col_id)
138+
return InterchangeColumn(self, pos)
139+
140+
def get_columns(self) -> Iterable[InterchangeColumn]:
141+
return [InterchangeColumn(self, i) for i in range(self.num_columns())]
142+
143+
def select_columns(self, indices: Sequence[int]) -> InterchangeDataFrame:
144+
col_ids = [self._value.value_columns[i] for i in indices]
145+
new_value = self._value.select_columns(col_ids)
146+
return InterchangeDataFrame(new_value)
147+
148+
def select_columns_by_name(self, names: Sequence[str]) -> InterchangeDataFrame:
149+
col_ids = [self._value.resolve_label_exact(name) for name in names]
150+
assert all(id is not None for id in col_ids)
151+
new_value = self._value.select_columns(col_ids) # type: ignore
152+
return InterchangeDataFrame(new_value)
153+
154+
def get_chunks(self, n_chunks: Optional[int] = None) -> Iterable:
155+
return self._arrow_dataframe().get_chunks(n_chunks)

bigframes/dataframe.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import bigframes.core.guid
6868
import bigframes.core.indexers as indexers
6969
import bigframes.core.indexes as indexes
70+
import bigframes.core.interchange
7071
import bigframes.core.ordering as order
7172
import bigframes.core.utils as utils
7273
import bigframes.core.validations as validations
@@ -1645,6 +1646,11 @@ def corrwith(
16451646
)
16461647
return bigframes.pandas.Series(block)
16471648

1649+
def __dataframe__(
1650+
self, nan_as_null: bool = False, allow_copy: bool = True
1651+
) -> bigframes.core.interchange.InterchangeDataFrame:
1652+
return bigframes.core.interchange.InterchangeDataFrame._from_bigframes(self)
1653+
16481654
def to_arrow(
16491655
self,
16501656
*,

0 commit comments

Comments
 (0)