Skip to content

Commit ced3618

Browse files
fix tokyo tests, improve prepare_plan helper
1 parent 1c20901 commit ced3618

File tree

2 files changed

+32
-63
lines changed

2 files changed

+32
-63
lines changed

bigframes/session/bq_caching_executor.py

Lines changed: 26 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from __future__ import annotations
1616

1717
import math
18-
import os
1918
import threading
2019
from typing import Literal, Mapping, Optional, Sequence, Tuple
2120
import warnings
@@ -40,7 +39,6 @@
4039
import bigframes.core.schema as schemata
4140
import bigframes.core.tree_properties as tree_properties
4241
import bigframes.dtypes
43-
import bigframes.features
4442
from bigframes.session import (
4543
executor,
4644
loader,
@@ -176,7 +174,7 @@ def to_sql(
176174
if offset_column:
177175
array_value, _ = array_value.promote_offsets()
178176
node = (
179-
self.prepare_plan(array_value.node, locality="original")
177+
self.prepare_plan(array_value.node, target="simplify")
180178
if enable_cache
181179
else array_value.node
182180
)
@@ -191,15 +189,14 @@ def execute(
191189
) -> executor.ExecuteResult:
192190
# TODO: Support export jobs in combination with semi executors
193191
if execution_spec.destination_spec is None:
194-
plan = self.prepare_plan(array_value.node, locality="original")
192+
plan = self.prepare_plan(array_value.node, target="simplify")
195193
for exec in self._semi_executors:
196194
maybe_result = exec.execute(
197195
plan, ordered=execution_spec.ordered, peek=execution_spec.peek
198196
)
199197
if maybe_result:
200198
return maybe_result
201199

202-
# next: prepare output spec
203200
if isinstance(execution_spec.destination_spec, ex_spec.TableOutputSpec):
204201
if execution_spec.peek or execution_spec.ordered:
205202
raise NotImplementedError(
@@ -273,11 +270,14 @@ def _export_gbq(
273270
"""
274271
Export the ArrayValue to an existing BigQuery table.
275272
"""
273+
plan = self.prepare_plan(array_value.node, target="bq_execution")
276274

277275
# validate destination table
278276
existing_table = self._maybe_find_existing_table(spec)
279277

280-
sql = self.to_sql(array_value, ordered=False)
278+
compiled = compile.compile_sql(compile.CompileRequest(plan, sort_rows=False))
279+
sql = compiled.sql
280+
281281
if (existing_table is not None) and _if_schema_match(
282282
existing_table.schema, array_value.schema
283283
):
@@ -433,17 +433,27 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue):
433433

434434
def prepare_plan(
435435
self,
436-
root: nodes.BigFrameNode,
437-
locality: Literal["bq_compat", "original"] = "bq_compat",
436+
plan: nodes.BigFrameNode,
437+
target: Literal["simplify", "bq_execution"] = "simplify",
438438
) -> nodes.BigFrameNode:
439439
"""
440-
Apply universal logical simplifications that are helpful regardless of engine.
440+
Prepare the plan by simplifying it with caches, removing unused operators. Has modes for different contexts.
441+
442+
"simplify" removes unused operations and subsitutes subtrees with their previously cached equivalents
443+
"bq_execution" is the most heavy option, preparing the plan for bq execution by also caching subtrees, uploading large local sources
441444
"""
442-
plan = self.replace_cached_subtrees(root)
445+
# TODO: We should model plan decomposition and data uploading as work steps rather than as plan preparation.
446+
if (
447+
target == "bq_execution"
448+
and bigframes.options.compute.enable_multi_query_execution
449+
):
450+
self._simplify_with_caching(plan)
451+
452+
plan = self.replace_cached_subtrees(plan)
443453
plan = rewrite.column_pruning(plan)
444454
plan = plan.top_down(rewrite.fold_row_counts)
445455

446-
if locality == "bq_compat":
456+
if target == "bq_execution":
447457
plan = self._substitute_large_local_sources(plan)
448458

449459
return plan
@@ -493,7 +503,10 @@ def _simplify_with_caching(self, plan: nodes.BigFrameNode):
493503
"""Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces."""
494504
# Apply existing caching first
495505
for _ in range(MAX_SUBTREE_FACTORINGS):
496-
if self.prepare_plan(plan).planning_complexity < QUERY_COMPLEXITY_LIMIT:
506+
if (
507+
self.prepare_plan(plan, "simplify").planning_complexity
508+
< QUERY_COMPLEXITY_LIMIT
509+
):
497510
return
498511

499512
did_cache = self._cache_most_complex_subtree(plan)
@@ -518,29 +531,6 @@ def _cache_most_complex_subtree(self, node: nodes.BigFrameNode) -> bool:
518531
self._cache_with_cluster_cols(bigframes.core.ArrayValue(selection), [])
519532
return True
520533

521-
def _validate_result_schema(
522-
self,
523-
array_value: bigframes.core.ArrayValue,
524-
bq_schema: list[bigquery.SchemaField],
525-
):
526-
actual_schema = _sanitize(tuple(bq_schema))
527-
ibis_schema = compile.test_only_ibis_inferred_schema(
528-
self.prepare_plan(array_value.node, locality="original")
529-
).to_bigquery()
530-
internal_schema = _sanitize(array_value.schema.to_bigquery())
531-
if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable:
532-
return
533-
534-
if internal_schema != actual_schema:
535-
raise ValueError(
536-
f"This error should only occur while testing. BigFrames internal schema: {internal_schema} does not match actual schema: {actual_schema}"
537-
)
538-
539-
if ibis_schema != actual_schema:
540-
raise ValueError(
541-
f"This error should only occur while testing. Ibis schema: {ibis_schema} does not match actual schema: {actual_schema}"
542-
)
543-
544534
def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode):
545535
"""
546536
Replace large local sources with the uploaded version of those datasources.
@@ -604,10 +594,7 @@ def _execute_plan_gbq(
604594
og_plan = plan
605595
og_schema = plan.schema
606596

607-
plan = self.prepare_plan(plan, locality="bq_compat")
608-
if bigframes.options.compute.enable_multi_query_execution:
609-
self._simplify_with_caching(plan)
610-
597+
plan = self.prepare_plan(plan, target="bq_execution")
611598
create_table = must_create_table
612599
cluster_cols: Sequence[str] = []
613600
if cache_spec is not None:
@@ -674,12 +661,6 @@ def _execute_plan_gbq(
674661
"`bigframes.options.compute.allow_large_results=True`."
675662
)
676663
warnings.warn(msg, FutureWarning)
677-
# Runs strict validations to ensure internal type predictions and ibis are completely in sync
678-
# Do not execute these validations outside of testing suite.
679-
if "PYTEST_CURRENT_TEST" in os.environ:
680-
self._validate_result_schema(
681-
bigframes.core.ArrayValue(plan), iterator.schema
682-
)
683664

684665
return executor.ExecuteResult(
685666
_arrow_batches=iterator.to_arrow_iterable(
@@ -705,19 +686,3 @@ def _if_schema_match(
705686
):
706687
return False
707688
return True
708-
709-
710-
def _sanitize(
711-
schema: Tuple[bigquery.SchemaField, ...]
712-
) -> Tuple[bigquery.SchemaField, ...]:
713-
# Schema inferred from SQL strings and Ibis expressions contain only names, types and modes,
714-
# so we disregard other fields (e.g timedelta description for timedelta columns) for validations.
715-
return tuple(
716-
bigquery.SchemaField(
717-
f.name,
718-
f.field_type,
719-
f.mode, # type:ignore
720-
fields=_sanitize(f.fields),
721-
)
722-
for f in schema
723-
)

tests/system/small/test_session.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,9 @@ def test_read_gbq_tokyo(
115115
# use_explicit_destination=True, otherwise might use path with no query_job
116116
exec_result = session_tokyo._executor.execute(
117117
df._block.expr,
118-
bigframes.session.execution_spec.ExecutionSpec(promise_under_10gb=False),
118+
bigframes.session.execution_spec.ExecutionSpec(
119+
bigframes.session.execution_spec.CacheSpec(()), promise_under_10gb=False
120+
),
119121
)
120122
assert exec_result.query_job is not None
121123
assert exec_result.query_job.location == tokyo_location
@@ -899,7 +901,9 @@ def test_read_pandas_tokyo(
899901

900902
result = session_tokyo._executor.execute(
901903
df._block.expr,
902-
bigframes.session.execution_spec.ExecutionSpec(promise_under_10gb=False),
904+
bigframes.session.execution_spec.ExecutionSpec(
905+
bigframes.session.execution_spec.CacheSpec(()), promise_under_10gb=False
906+
),
903907
)
904908
assert result.query_job is not None
905909
assert result.query_job.location == tokyo_location

0 commit comments

Comments
 (0)