Skip to content

Commit e65e8a6

Browse files
refactor: Unify bigquery execution paths
1 parent 3a4be75 commit e65e8a6

File tree

9 files changed

+355
-311
lines changed

9 files changed

+355
-311
lines changed

bigframes/core/blocks.py

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
import bigframes.exceptions as bfe
7070
import bigframes.operations as ops
7171
import bigframes.operations.aggregations as agg_ops
72-
from bigframes.session import dry_runs
72+
from bigframes.session import dry_runs, execution_spec
7373
from bigframes.session import executor as executors
7474

7575
# Type constraint for wherever column labels are used
@@ -257,7 +257,10 @@ def shape(self) -> typing.Tuple[int, int]:
257257
except Exception:
258258
pass
259259

260-
row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar()
260+
row_count = self.session._executor.execute(
261+
self.expr.row_count(),
262+
execution_spec.ExecutionSpec(promise_under_10gb=True, ordered=False),
263+
).to_py_scalar()
261264
return (row_count, len(self.value_columns))
262265

263266
@property
@@ -539,8 +542,17 @@ def to_arrow(
539542
allow_large_results: Optional[bool] = None,
540543
) -> Tuple[pa.Table, Optional[bigquery.QueryJob]]:
541544
"""Run query and download results as a pyarrow Table."""
545+
under_10gb = (
546+
(not allow_large_results)
547+
if (allow_large_results is not None)
548+
else bigframes.options._allow_large_results
549+
)
542550
execute_result = self.session._executor.execute(
543-
self.expr, ordered=ordered, use_explicit_destination=allow_large_results
551+
self.expr,
552+
execution_spec.ExecutionSpec(
553+
promise_under_10gb=under_10gb,
554+
ordered=ordered,
555+
),
544556
)
545557
pa_table = execute_result.to_arrow_table()
546558

@@ -629,8 +641,15 @@ def try_peek(
629641
self, n: int = 20, force: bool = False, allow_large_results=None
630642
) -> typing.Optional[pd.DataFrame]:
631643
if force or self.expr.supports_fast_peek:
632-
result = self.session._executor.peek(
633-
self.expr, n, use_explicit_destination=allow_large_results
644+
# really, we should just block insane peek values and always assume <10gb
645+
under_10gb = (
646+
(not allow_large_results)
647+
if (allow_large_results is not None)
648+
else bigframes.options._allow_large_results
649+
)
650+
result = self.session._executor.execute(
651+
self.expr,
652+
execution_spec.ExecutionSpec(promise_under_10gb=under_10gb, peek=n),
634653
)
635654
df = result.to_pandas()
636655
return self._copy_index_to_pandas(df)
@@ -647,10 +666,18 @@ def to_pandas_batches(
647666
648667
page_size and max_results determine the size and number of batches,
649668
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result"""
669+
670+
under_10gb = (
671+
(not allow_large_results)
672+
if (allow_large_results is not None)
673+
else bigframes.options._allow_large_results
674+
)
650675
execute_result = self.session._executor.execute(
651676
self.expr,
652-
ordered=True,
653-
use_explicit_destination=allow_large_results,
677+
execution_spec.ExecutionSpec(
678+
promise_under_10gb=under_10gb,
679+
ordered=True,
680+
),
654681
)
655682

656683
# To reduce the number of edge cases to consider when working with the
@@ -696,10 +723,17 @@ def _materialize_local(
696723
) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]:
697724
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
698725
# TODO(swast): Allow for dry run and timeout.
726+
under_10gb = (
727+
(not materialize_options.allow_large_results)
728+
if (materialize_options.allow_large_results is not None)
729+
else bigframes.options._allow_large_results
730+
)
699731
execute_result = self.session._executor.execute(
700732
self.expr,
701-
ordered=materialize_options.ordered,
702-
use_explicit_destination=materialize_options.allow_large_results,
733+
execution_spec.ExecutionSpec(
734+
promise_under_10gb=under_10gb,
735+
ordered=True,
736+
),
703737
)
704738
sample_config = materialize_options.downsampling
705739
if execute_result.total_bytes is not None:
@@ -1616,9 +1650,19 @@ def retrieve_repr_request_results(
16161650
config=executors.CacheConfig(optimize_for="head", if_cached="reuse-strict"),
16171651
)
16181652
head_result = self.session._executor.execute(
1619-
self.expr.slice(start=None, stop=max_results, step=None)
1653+
self.expr.slice(start=None, stop=max_results, step=None),
1654+
execution_spec.ExecutionSpec(
1655+
promise_under_10gb=True,
1656+
ordered=True,
1657+
),
16201658
)
1621-
row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar()
1659+
row_count = self.session._executor.execute(
1660+
self.expr.row_count(),
1661+
execution_spec.ExecutionSpec(
1662+
promise_under_10gb=True,
1663+
ordered=False,
1664+
),
1665+
).to_py_scalar()
16221666

16231667
head_df = head_result.to_pandas()
16241668
return self._copy_index_to_pandas(head_df), row_count, head_result.query_job

bigframes/core/indexes/base.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import bigframes.operations as ops
3939
import bigframes.operations.aggregations as agg_ops
4040
import bigframes.series
41+
import bigframes.session.execution_spec as ex_spec
4142

4243
if typing.TYPE_CHECKING:
4344
import bigframes.dataframe
@@ -283,8 +284,9 @@ def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]:
283284
# Check if key exists at all by counting
284285
count_agg = ex.UnaryAggregation(agg_ops.count_op, ex.deref(offsets_id))
285286
count_result = filtered_block._expr.aggregate([(count_agg, "count")])
287+
286288
count_scalar = self._block.session._executor.execute(
287-
count_result
289+
count_result, ex_spec.ExecutionSpec(promise_under_10gb=True)
288290
).to_py_scalar()
289291

290292
if count_scalar == 0:
@@ -295,7 +297,7 @@ def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]:
295297
min_agg = ex.UnaryAggregation(agg_ops.min_op, ex.deref(offsets_id))
296298
position_result = filtered_block._expr.aggregate([(min_agg, "position")])
297299
position_scalar = self._block.session._executor.execute(
298-
position_result
300+
position_result, ex_spec.ExecutionSpec(promise_under_10gb=True)
299301
).to_py_scalar()
300302
return int(position_scalar)
301303

@@ -326,7 +328,10 @@ def _get_monotonic_slice(self, filtered_block, offsets_id: str) -> slice:
326328
combined_result = filtered_block._expr.aggregate(min_max_aggs)
327329

328330
# Execute query and extract positions
329-
result_df = self._block.session._executor.execute(combined_result).to_pandas()
331+
result_df = self._block.session._executor.execute(
332+
combined_result,
333+
execution_spec=ex_spec.ExecutionSpec(promise_under_10gb=True),
334+
).to_pandas()
330335
min_pos = int(result_df["min_pos"].iloc[0])
331336
max_pos = int(result_df["max_pos"].iloc[0])
332337

bigframes/dataframe.py

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import bigframes.operations.structs
8585
import bigframes.series
8686
import bigframes.session._io.bigquery
87+
import bigframes.session.execution_spec as ex_spec
8788

8889
if typing.TYPE_CHECKING:
8990
from _typeshed import SupportsRichComparison
@@ -4184,17 +4185,19 @@ def to_csv(
41844185
index=index and self._has_index,
41854186
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
41864187
)
4187-
options = {
4188+
options: dict[str, Union[bool, str]] = {
41884189
"field_delimiter": sep,
41894190
"header": header,
41904191
}
4191-
query_job = self._session._executor.export_gcs(
4192+
result = self._session._executor.execute(
41924193
export_array.rename_columns(id_overrides),
4193-
path_or_buf,
4194-
format="csv",
4195-
export_options=options,
4194+
ex_spec.ExecutionSpec(
4195+
ex_spec.GcsOutputSpec(
4196+
uri=path_or_buf, format="csv", export_options=tuple(options.items())
4197+
)
4198+
),
41964199
)
4197-
self._set_internal_query_job(query_job)
4200+
self._set_internal_query_job(result.query_job)
41984201
return None
41994202

42004203
def to_json(
@@ -4237,13 +4240,13 @@ def to_json(
42374240
index=index and self._has_index,
42384241
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
42394242
)
4240-
query_job = self._session._executor.export_gcs(
4243+
result = self._session._executor.execute(
42414244
export_array.rename_columns(id_overrides),
4242-
path_or_buf,
4243-
format="json",
4244-
export_options={},
4245+
ex_spec.ExecutionSpec(
4246+
ex_spec.GcsOutputSpec(uri=path_or_buf, format="json", export_options=())
4247+
),
42454248
)
4246-
self._set_internal_query_job(query_job)
4249+
self._set_internal_query_job(result.query_job)
42474250
return None
42484251

42494252
def to_gbq(
@@ -4316,16 +4319,21 @@ def to_gbq(
43164319
)
43174320
)
43184321

4319-
query_job = self._session._executor.export_gbq(
4322+
result = self._session._executor.execute(
43204323
export_array.rename_columns(id_overrides),
4321-
destination=destination,
4322-
cluster_cols=clustering_fields,
4323-
if_exists=if_exists,
4324+
ex_spec.ExecutionSpec(
4325+
ex_spec.TableOutputSpec(
4326+
destination,
4327+
cluster_cols=tuple(clustering_fields),
4328+
if_exists=if_exists,
4329+
)
4330+
),
43244331
)
4325-
self._set_internal_query_job(query_job)
4332+
assert result.query_job is not None
4333+
self._set_internal_query_job(result.query_job)
43264334

43274335
# The query job should have finished, so there should be always be a result table.
4328-
result_table = query_job.destination
4336+
result_table = result.query_job.destination
43294337
assert result_table is not None
43304338

43314339
if temp_table_ref:
@@ -4393,13 +4401,17 @@ def to_parquet(
43934401
index=index and self._has_index,
43944402
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
43954403
)
4396-
query_job = self._session._executor.export_gcs(
4404+
result = self._session._executor.execute(
43974405
export_array.rename_columns(id_overrides),
4398-
path,
4399-
format="parquet",
4400-
export_options=export_options,
4406+
ex_spec.ExecutionSpec(
4407+
ex_spec.GcsOutputSpec(
4408+
uri=path,
4409+
format="parquet",
4410+
export_options=tuple(export_options.items()),
4411+
)
4412+
),
44014413
)
4402-
self._set_internal_query_job(query_job)
4414+
self._set_internal_query_job(result.query_job)
44034415
return None
44044416

44054417
def to_dict(

0 commit comments

Comments
 (0)