Skip to content

Commit 2d935f6

Browse files
authored
Merge branch 'main' into rf-vpc-fix
2 parents a870487 + 49b91e8 commit 2d935f6

34 files changed

+469
-280
lines changed

bigframes/core/blocks.py

Lines changed: 25 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import functools
2828
import itertools
2929
import random
30-
import textwrap
3130
import typing
3231
from typing import (
3332
Iterable,
@@ -54,16 +53,13 @@
5453
from bigframes.core import agg_expressions, local_data
5554
import bigframes.core as core
5655
import bigframes.core.agg_expressions as ex_types
57-
import bigframes.core.compile.googlesql as googlesql
5856
import bigframes.core.expression as ex
5957
import bigframes.core.expression as scalars
6058
import bigframes.core.guid as guid
6159
import bigframes.core.identifiers
6260
import bigframes.core.join_def as join_defs
6361
import bigframes.core.ordering as ordering
6462
import bigframes.core.pyarrow_utils as pyarrow_utils
65-
import bigframes.core.schema as bf_schema
66-
import bigframes.core.sql as sql
6763
import bigframes.core.utils as utils
6864
import bigframes.core.window_spec as windows
6965
import bigframes.dtypes
@@ -2776,14 +2772,6 @@ def _throw_if_null_index(self, opname: str):
27762772
)
27772773

27782774
def _get_rows_as_json_values(self) -> Block:
2779-
# We want to preserve any ordering currently present before turning to
2780-
# direct SQL manipulation. We will restore the ordering when we rebuild
2781-
# expression.
2782-
# TODO(shobs): Replace direct SQL manipulation by structured expression
2783-
# manipulation
2784-
expr, ordering_column_name = self.expr.promote_offsets()
2785-
expr_sql = self.session._executor.to_sql(expr)
2786-
27872775
# Names of the columns to serialize for the row.
27882776
# We will use the repr-eval pattern to serialize a value here and
27892777
# deserialize in the cloud function. Let's make sure that would work.
@@ -2799,93 +2787,44 @@ def _get_rows_as_json_values(self) -> Block:
27992787
)
28002788

28012789
column_names.append(serialized_column_name)
2802-
column_names_csv = sql.csv(map(sql.simple_literal, column_names))
2803-
2804-
# index columns count
2805-
index_columns_count = len(self.index_columns)
28062790

28072791
# column references to form the array of values for the row
28082792
column_types = list(self.index.dtypes) + list(self.dtypes)
28092793
column_references = []
28102794
for type_, col in zip(column_types, self.expr.column_ids):
2811-
if isinstance(type_, pd.ArrowDtype) and pa.types.is_binary(
2812-
type_.pyarrow_dtype
2813-
):
2814-
column_references.append(sql.to_json_string(col))
2795+
if type_ == bigframes.dtypes.BYTES_DTYPE:
2796+
column_references.append(ops.ToJSONString().as_expr(col))
2797+
elif type_ == bigframes.dtypes.BOOL_DTYPE:
2798+
# cast operator produces True/False, but function template expects lower case
2799+
column_references.append(
2800+
ops.lower_op.as_expr(
2801+
ops.AsTypeOp(bigframes.dtypes.STRING_DTYPE).as_expr(col)
2802+
)
2803+
)
28152804
else:
2816-
column_references.append(sql.cast_as_string(col))
2817-
2818-
column_references_csv = sql.csv(column_references)
2819-
2820-
# types of the columns to serialize for the row
2821-
column_types_csv = sql.csv(
2822-
[sql.simple_literal(str(typ)) for typ in column_types]
2823-
)
2805+
column_references.append(
2806+
ops.AsTypeOp(bigframes.dtypes.STRING_DTYPE).as_expr(col)
2807+
)
28242808

28252809
# row dtype to use for deserializing the row as pandas series
28262810
pandas_row_dtype = bigframes.dtypes.lcd_type(*column_types)
28272811
if pandas_row_dtype is None:
28282812
pandas_row_dtype = "object"
2829-
pandas_row_dtype = sql.simple_literal(str(pandas_row_dtype))
2830-
2831-
# create a json column representing row through SQL manipulation
2832-
row_json_column_name = guid.generate_guid()
2833-
select_columns = (
2834-
[ordering_column_name] + list(self.index_columns) + [row_json_column_name]
2835-
)
2836-
select_columns_csv = sql.csv(
2837-
[googlesql.identifier(col) for col in select_columns]
2838-
)
2839-
json_sql = f"""\
2840-
With T0 AS (
2841-
{textwrap.indent(expr_sql, " ")}
2842-
),
2843-
T1 AS (
2844-
SELECT *,
2845-
TO_JSON_STRING(JSON_OBJECT(
2846-
"names", [{column_names_csv}],
2847-
"types", [{column_types_csv}],
2848-
"values", [{column_references_csv}],
2849-
"indexlength", {index_columns_count},
2850-
"dtype", {pandas_row_dtype}
2851-
)) AS {googlesql.identifier(row_json_column_name)} FROM T0
2852-
)
2853-
SELECT {select_columns_csv} FROM T1
2854-
"""
2855-
# The only ways this code is used is through df.apply(axis=1) cope path
2856-
destination, query_job = self.session._loader._query_to_destination(
2857-
json_sql, cluster_candidates=[ordering_column_name]
2858-
)
2859-
if not destination:
2860-
raise ValueError(f"Query job {query_job} did not produce result table")
2861-
2862-
new_schema = (
2863-
self.expr.schema.select([*self.index_columns])
2864-
.append(
2865-
bf_schema.SchemaItem(
2866-
row_json_column_name, bigframes.dtypes.STRING_DTYPE
2867-
)
2868-
)
2869-
.append(
2870-
bf_schema.SchemaItem(ordering_column_name, bigframes.dtypes.INT_DTYPE)
2871-
)
2872-
)
2813+
pandas_row_dtype = str(pandas_row_dtype)
28732814

2874-
dest_table = self.session.bqclient.get_table(destination)
2875-
expr = core.ArrayValue.from_table(
2876-
dest_table,
2877-
schema=new_schema,
2878-
session=self.session,
2879-
offsets_col=ordering_column_name,
2880-
n_rows=dest_table.num_rows,
2881-
).drop_columns([ordering_column_name])
2882-
block = Block(
2883-
expr,
2884-
index_columns=self.index_columns,
2885-
column_labels=[row_json_column_name],
2886-
index_labels=self._index_labels,
2815+
struct_op = ops.StructOp(
2816+
column_names=("names", "types", "values", "indexlength", "dtype")
28872817
)
2888-
return block
2818+
names_val = ex.const(tuple(column_names))
2819+
types_val = ex.const(tuple(map(str, column_types)))
2820+
values_val = ops.ToArrayOp().as_expr(*column_references)
2821+
indexlength_val = ex.const(len(self.index_columns))
2822+
dtype_val = ex.const(str(pandas_row_dtype))
2823+
struct_expr = struct_op.as_expr(
2824+
names_val, types_val, values_val, indexlength_val, dtype_val
2825+
)
2826+
block, col_id = self.project_expr(ops.ToJSONString().as_expr(struct_expr))
2827+
return block.select_column(col_id)
28892828

28902829

28912830
class BlockIndexProperties:

bigframes/core/compile/ibis_compiler/scalar_op_registry.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1301,8 +1301,8 @@ def parse_json_op_impl(x: ibis_types.Value, op: ops.ParseJSON):
13011301

13021302

13031303
@scalar_op_compiler.register_unary_op(ops.ToJSONString)
1304-
def to_json_string_op_impl(json_obj: ibis_types.Value):
1305-
return to_json_string(json_obj=json_obj)
1304+
def to_json_string_op_impl(x: ibis_types.Value):
1305+
return to_json_string(value=x)
13061306

13071307

13081308
@scalar_op_compiler.register_unary_op(ops.JSONValue, pass_op=True)
@@ -2069,9 +2069,9 @@ def json_extract_string_array( # type: ignore[empty-body]
20692069

20702070
@ibis_udf.scalar.builtin(name="to_json_string")
20712071
def to_json_string( # type: ignore[empty-body]
2072-
json_obj: ibis_dtypes.JSON,
2072+
value,
20732073
) -> ibis_dtypes.String:
2074-
"""Convert JSON to STRING."""
2074+
"""Convert value to JSON-formatted string."""
20752075

20762076

20772077
@ibis_udf.scalar.builtin(name="json_value")

bigframes/core/compile/ibis_types.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -386,10 +386,6 @@ def literal_to_ibis_scalar(
386386
ibis_dtype = bigframes_dtype_to_ibis_dtype(force_dtype) if force_dtype else None
387387

388388
if pd.api.types.is_list_like(literal):
389-
if validate:
390-
raise ValueError(
391-
f"List types can't be stored in BigQuery DataFrames. {constants.FEEDBACK_LINK}"
392-
)
393389
# "correct" way would be to use ibis.array, but this produces invalid BQ SQL syntax
394390
return tuple(literal)
395391

bigframes/core/groupby/dataframe_group_by.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -461,23 +461,19 @@ def expanding(self, min_periods: int = 1) -> windows.Window:
461461

462462
def agg(self, func=None, **kwargs) -> typing.Union[df.DataFrame, series.Series]:
463463
if func:
464-
if isinstance(func, str):
465-
return self.size() if func == "size" else self._agg_string(func)
466-
elif utils.is_dict_like(func):
464+
if utils.is_dict_like(func):
467465
return self._agg_dict(func)
468466
elif utils.is_list_like(func):
469467
return self._agg_list(func)
470468
else:
471-
raise NotImplementedError(
472-
f"Aggregate with {func} not supported. {constants.FEEDBACK_LINK}"
473-
)
469+
return self.size() if func == "size" else self._agg_func(func)
474470
else:
475471
return self._agg_named(**kwargs)
476472

477-
def _agg_string(self, func: str) -> df.DataFrame:
473+
def _agg_func(self, func) -> df.DataFrame:
478474
ids, labels = self._aggregated_columns()
479475
aggregations = [
480-
aggs.agg(col_id, agg_ops.lookup_agg_func(func)) for col_id in ids
476+
aggs.agg(col_id, agg_ops.lookup_agg_func(func)[0]) for col_id in ids
481477
]
482478
agg_block, _ = self._block.aggregate(
483479
by_column_ids=self._by_col_ids,
@@ -500,7 +496,7 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
500496
funcs_for_id if utils.is_list_like(funcs_for_id) else [funcs_for_id]
501497
)
502498
for f in func_list:
503-
aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(f)))
499+
aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(f)[0]))
504500
column_labels.append(label)
505501
agg_block, _ = self._block.aggregate(
506502
by_column_ids=self._by_col_ids,
@@ -525,19 +521,23 @@ def _agg_dict(self, func: typing.Mapping) -> df.DataFrame:
525521
def _agg_list(self, func: typing.Sequence) -> df.DataFrame:
526522
ids, labels = self._aggregated_columns()
527523
aggregations = [
528-
aggs.agg(col_id, agg_ops.lookup_agg_func(f)) for col_id in ids for f in func
524+
aggs.agg(col_id, agg_ops.lookup_agg_func(f)[0])
525+
for col_id in ids
526+
for f in func
529527
]
530528

531529
if self._block.column_labels.nlevels > 1:
532530
# Restructure MultiIndex for proper format: (idx1, idx2, func)
533531
# rather than ((idx1, idx2), func).
534532
column_labels = [
535-
tuple(label) + (f,)
533+
tuple(label) + (agg_ops.lookup_agg_func(f)[1],)
536534
for label in labels.to_frame(index=False).to_numpy()
537535
for f in func
538536
]
539537
else: # Single-level index
540-
column_labels = [(label, f) for label in labels for f in func]
538+
column_labels = [
539+
(label, agg_ops.lookup_agg_func(f)[1]) for label in labels for f in func
540+
]
541541

542542
agg_block, _ = self._block.aggregate(
543543
by_column_ids=self._by_col_ids,
@@ -563,7 +563,7 @@ def _agg_named(self, **kwargs) -> df.DataFrame:
563563
if not isinstance(v, tuple) or (len(v) != 2):
564564
raise TypeError("kwargs values must be 2-tuples of column, aggfunc")
565565
col_id = self._resolve_label(v[0])
566-
aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(v[1])))
566+
aggregations.append(aggs.agg(col_id, agg_ops.lookup_agg_func(v[1])[0]))
567567
column_labels.append(k)
568568
agg_block, _ = self._block.aggregate(
569569
by_column_ids=self._by_col_ids,

bigframes/core/groupby/series_group_by.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -216,18 +216,17 @@ def prod(self, *args) -> series.Series:
216216

217217
def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]:
218218
column_names: list[str] = []
219-
if isinstance(func, str):
220-
aggregations = [aggs.agg(self._value_column, agg_ops.lookup_agg_func(func))]
221-
column_names = [func]
222-
elif utils.is_list_like(func):
223-
aggregations = [
224-
aggs.agg(self._value_column, agg_ops.lookup_agg_func(f)) for f in func
225-
]
226-
column_names = list(func)
227-
else:
219+
if utils.is_dict_like(func):
228220
raise NotImplementedError(
229221
f"Aggregate with {func} not supported. {constants.FEEDBACK_LINK}"
230222
)
223+
if not utils.is_list_like(func):
224+
func = [func]
225+
226+
aggregations = [
227+
aggs.agg(self._value_column, agg_ops.lookup_agg_func(f)[0]) for f in func
228+
]
229+
column_names = [agg_ops.lookup_agg_func(f)[1] for f in func]
231230

232231
agg_block, _ = self._block.aggregate(
233232
by_column_ids=self._by_col_ids,

bigframes/core/log_adapter.py

Lines changed: 51 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -149,49 +149,61 @@ def wrap(cls):
149149
return wrap(decorated_cls)
150150

151151

152-
def method_logger(method, /, *, custom_base_name: Optional[str] = None):
152+
def method_logger(method=None, /, *, custom_base_name: Optional[str] = None):
153153
"""Decorator that adds logging functionality to a method."""
154154

155-
@functools.wraps(method)
156-
def wrapper(*args, **kwargs):
157-
api_method_name = getattr(method, LOG_OVERRIDE_NAME, method.__name__)
158-
if custom_base_name is None:
159-
qualname_parts = getattr(method, "__qualname__", method.__name__).split(".")
160-
class_name = qualname_parts[-2] if len(qualname_parts) > 1 else ""
161-
base_name = (
162-
class_name if class_name else "_".join(method.__module__.split(".")[1:])
163-
)
164-
else:
165-
base_name = custom_base_name
166-
167-
full_method_name = f"{base_name.lower()}-{api_method_name}"
168-
# Track directly called methods
169-
if len(_call_stack) == 0:
170-
add_api_method(full_method_name)
171-
172-
_call_stack.append(full_method_name)
173-
174-
try:
175-
return method(*args, **kwargs)
176-
except (NotImplementedError, TypeError) as e:
177-
# Log method parameters that are implemented in pandas but either missing (TypeError)
178-
# or not fully supported (NotImplementedError) in BigFrames.
179-
# Logging is currently supported only when we can access the bqclient through
180-
# _block.session.bqclient.
181-
if len(_call_stack) == 1:
182-
submit_pandas_labels(
183-
_get_bq_client(*args, **kwargs),
184-
base_name,
185-
api_method_name,
186-
args,
187-
kwargs,
188-
task=PANDAS_PARAM_TRACKING_TASK,
155+
def outer_wrapper(method):
156+
@functools.wraps(method)
157+
def wrapper(*args, **kwargs):
158+
api_method_name = getattr(method, LOG_OVERRIDE_NAME, method.__name__)
159+
if custom_base_name is None:
160+
qualname_parts = getattr(method, "__qualname__", method.__name__).split(
161+
"."
162+
)
163+
class_name = qualname_parts[-2] if len(qualname_parts) > 1 else ""
164+
base_name = (
165+
class_name
166+
if class_name
167+
else "_".join(method.__module__.split(".")[1:])
189168
)
190-
raise e
191-
finally:
192-
_call_stack.pop()
169+
else:
170+
base_name = custom_base_name
193171

194-
return wrapper
172+
full_method_name = f"{base_name.lower()}-{api_method_name}"
173+
# Track directly called methods
174+
if len(_call_stack) == 0:
175+
add_api_method(full_method_name)
176+
177+
_call_stack.append(full_method_name)
178+
179+
try:
180+
return method(*args, **kwargs)
181+
except (NotImplementedError, TypeError) as e:
182+
# Log method parameters that are implemented in pandas but either missing (TypeError)
183+
# or not fully supported (NotImplementedError) in BigFrames.
184+
# Logging is currently supported only when we can access the bqclient through
185+
# _block.session.bqclient.
186+
if len(_call_stack) == 1:
187+
submit_pandas_labels(
188+
_get_bq_client(*args, **kwargs),
189+
base_name,
190+
api_method_name,
191+
args,
192+
kwargs,
193+
task=PANDAS_PARAM_TRACKING_TASK,
194+
)
195+
raise e
196+
finally:
197+
_call_stack.pop()
198+
199+
return wrapper
200+
201+
if method is None:
202+
# Called with parentheses
203+
return outer_wrapper
204+
205+
# Called without parentheses
206+
return outer_wrapper(method)
195207

196208

197209
def property_logger(prop):

0 commit comments

Comments
 (0)