Skip to content

Commit f11a100

Browse files
authored
Merge branch 'main' into df-where-callable
2 parents 81481e6 + b692713 commit f11a100

File tree

65 files changed

+1973
-253
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1973
-253
lines changed

bigframes/_config/display_options.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,12 @@
2626
class DisplayOptions:
2727
__doc__ = vendored_pandas_config.display_options_doc
2828

29+
# Options borrowed from pandas.
2930
max_columns: int = 20
30-
max_rows: int = 25
31+
max_rows: int = 10
32+
precision: int = 6
33+
34+
# Options unique to BigQuery DataFrames.
3135
progress_bar: Optional[str] = "auto"
3236
repr_mode: Literal["head", "deferred", "anywidget"] = "head"
3337

@@ -52,6 +56,8 @@ def pandas_repr(display_options: DisplayOptions):
5256
display_options.max_columns,
5357
"display.max_rows",
5458
display_options.max_rows,
59+
"display.precision",
60+
display_options.precision,
5561
"display.show_dimensions",
5662
True,
5763
) as pandas_context:

bigframes/blob/_functions.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,9 @@ def pdf_extract_func(src_obj_ref_rt: str) -> str:
473473
return result_json
474474

475475

476-
pdf_extract_def = FunctionDef(pdf_extract_func, ["pypdf", "requests", "pypdf[crypto]"])
476+
pdf_extract_def = FunctionDef(
477+
pdf_extract_func, ["pypdf>=5.3.1,<6.0.0", "requests", "cryptography==43.0.3"]
478+
)
477479

478480

479481
# Extracts text from a PDF url and chunks it simultaneously
@@ -527,4 +529,6 @@ def pdf_chunk_func(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> s
527529
return result_json
528530

529531

530-
pdf_chunk_def = FunctionDef(pdf_chunk_func, ["pypdf", "requests", "pypdf[crypto]"])
532+
pdf_chunk_def = FunctionDef(
533+
pdf_chunk_func, ["pypdf>=5.3.1,<6.0.0", "requests", "cryptography==43.0.3"]
534+
)

bigframes/core/blocks.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -387,25 +387,39 @@ def reversed(self) -> Block:
387387
index_labels=self.index.names,
388388
)
389389

390-
def reset_index(self, drop: bool = True) -> Block:
390+
def reset_index(self, level: LevelsType = None, drop: bool = True) -> Block:
391391
"""Reset the index of the block, promoting the old index to a value column.
392392
393393
Arguments:
394+
level: the label or index level of the index levels to remove.
394395
name: this is the column id for the new value id derived from the old index
395396
396397
Returns:
397398
A new Block because dropping index columns can break references
398399
from Index classes that point to this block.
399400
"""
401+
if level:
402+
# preserve original order, not user provided order
403+
level_ids: Sequence[str] = [
404+
id for id in self.index_columns if id in self.index.resolve_level(level)
405+
]
406+
else:
407+
level_ids = self.index_columns
408+
400409
expr = self._expr
401-
if (
410+
if set(self.index_columns) > set(level_ids):
411+
new_index_cols = [col for col in self.index_columns if col not in level_ids]
412+
new_index_labels = [self.col_id_to_index_name[id] for id in new_index_cols]
413+
elif (
402414
self.session._default_index_type
403415
== bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64
404416
):
405417
expr, new_index_col_id = expr.promote_offsets()
406418
new_index_cols = [new_index_col_id]
419+
new_index_labels = [None]
407420
elif self.session._default_index_type == bigframes.enums.DefaultIndexKind.NULL:
408421
new_index_cols = []
422+
new_index_labels = []
409423
else:
410424
raise ValueError(
411425
f"Unrecognized default index kind: {self.session._default_index_type}"
@@ -415,22 +429,23 @@ def reset_index(self, drop: bool = True) -> Block:
415429
# Even though the index might be part of the ordering, keep that
416430
# ordering expression as reset_index shouldn't change the row
417431
# order.
418-
expr = expr.drop_columns(self.index_columns)
432+
expr = expr.drop_columns(level_ids)
419433
return Block(
420434
expr,
421435
index_columns=new_index_cols,
436+
index_labels=new_index_labels,
422437
column_labels=self.column_labels,
423438
)
424439
else:
425440
# Add index names to column index
426-
index_labels = self.index.names
427441
column_labels_modified = self.column_labels
428-
for level, label in enumerate(index_labels):
442+
for position, level_id in enumerate(level_ids):
443+
label = self.col_id_to_index_name[level_id]
429444
if label is None:
430-
if "index" not in self.column_labels and len(index_labels) <= 1:
445+
if "index" not in self.column_labels and self.index.nlevels <= 1:
431446
label = "index"
432447
else:
433-
label = f"level_{level}"
448+
label = f"level_{self.index_columns.index(level_id)}"
434449

435450
if label in self.column_labels:
436451
raise ValueError(f"cannot insert {label}, already exists")
@@ -439,11 +454,12 @@ def reset_index(self, drop: bool = True) -> Block:
439454
label = tuple(label if i == 0 else "" for i in range(nlevels))
440455
# Create index copy with label inserted
441456
# See: https://pandas.pydata.org/docs/reference/api/pandas.Index.insert.html
442-
column_labels_modified = column_labels_modified.insert(level, label)
457+
column_labels_modified = column_labels_modified.insert(position, label)
443458

444459
return Block(
445-
expr,
460+
expr.select_columns((*new_index_cols, *level_ids, *self.value_columns)),
446461
index_columns=new_index_cols,
462+
index_labels=new_index_labels,
447463
column_labels=column_labels_modified,
448464
)
449465

bigframes/core/compile/polars/compiler.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,10 @@ def _(self, op: ops.ScalarOp, l_input: pl.Expr, r_input: pl.Expr) -> pl.Expr:
198198
def _(self, op: ops.ScalarOp, l_input: pl.Expr, r_input: pl.Expr) -> pl.Expr:
199199
return l_input | r_input
200200

201+
@compile_op.register(bool_ops.XorOp)
202+
def _(self, op: ops.ScalarOp, l_input: pl.Expr, r_input: pl.Expr) -> pl.Expr:
203+
return l_input ^ r_input
204+
201205
@compile_op.register(num_ops.AddOp)
202206
def _(self, op: ops.ScalarOp, l_input: pl.Expr, r_input: pl.Expr) -> pl.Expr:
203207
return l_input + r_input

bigframes/core/compile/sqlglot/aggregate_compiler.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import sqlglot.expressions as sge
1717

18-
from bigframes.core import expression
18+
from bigframes.core import expression, window_spec
1919
from bigframes.core.compile.sqlglot.aggregations import (
2020
binary_compiler,
2121
nullary_compiler,
@@ -56,3 +56,21 @@ def compile_aggregate(
5656
return binary_compiler.compile(aggregate.op, left, right)
5757
else:
5858
raise ValueError(f"Unexpected aggregation: {aggregate}")
59+
60+
61+
def compile_analytic(
62+
aggregate: expression.Aggregation,
63+
window: window_spec.WindowSpec,
64+
) -> sge.Expression:
65+
if isinstance(aggregate, expression.NullaryAggregation):
66+
return nullary_compiler.compile(aggregate.op)
67+
if isinstance(aggregate, expression.UnaryAggregation):
68+
column = typed_expr.TypedExpr(
69+
scalar_compiler.compile_scalar_expression(aggregate.arg),
70+
aggregate.arg.output_type,
71+
)
72+
return unary_compiler.compile(aggregate.op, column, window)
73+
elif isinstance(aggregate, expression.BinaryAggregation):
74+
raise NotImplementedError("binary analytic operations not yet supported")
75+
else:
76+
raise ValueError(f"Unexpected analytic operation: {aggregate}")

bigframes/core/compile/sqlglot/aggregations/unary_compiler.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import sqlglot.expressions as sge
2020

21+
from bigframes import dtypes
2122
from bigframes.core import window_spec
2223
import bigframes.core.compile.sqlglot.aggregations.op_registration as reg
2324
from bigframes.core.compile.sqlglot.aggregations.windows import apply_window_if_present
@@ -36,14 +37,26 @@ def compile(
3637
return UNARY_OP_REGISTRATION[op](op, column, window=window)
3738

3839

40+
@UNARY_OP_REGISTRATION.register(agg_ops.CountOp)
41+
def _(
42+
op: agg_ops.CountOp,
43+
column: typed_expr.TypedExpr,
44+
window: typing.Optional[window_spec.WindowSpec] = None,
45+
) -> sge.Expression:
46+
return apply_window_if_present(sge.func("COUNT", column.expr), window)
47+
48+
3949
@UNARY_OP_REGISTRATION.register(agg_ops.SumOp)
4050
def _(
4151
op: agg_ops.SumOp,
4252
column: typed_expr.TypedExpr,
4353
window: typing.Optional[window_spec.WindowSpec] = None,
4454
) -> sge.Expression:
55+
expr = column.expr
56+
if column.dtype == dtypes.BOOL_DTYPE:
57+
expr = sge.Cast(this=column.expr, to="INT64")
4558
# Will be null if all inputs are null. Pandas defaults to zero sum though.
46-
expr = apply_window_if_present(sge.func("SUM", column.expr), window)
59+
expr = apply_window_if_present(sge.func("SUM", expr), window)
4760
return sge.func("IFNULL", expr, ir._literal(0, column.dtype))
4861

4962

bigframes/core/compile/sqlglot/compiler.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,75 @@ def compile_aggregate(
298298

299299
return child.aggregate(aggregations, by_cols, tuple(dropna_cols))
300300

301+
@_compile_node.register
302+
def compile_window(
303+
self, node: nodes.WindowOpNode, child: ir.SQLGlotIR
304+
) -> ir.SQLGlotIR:
305+
window_spec = node.window_spec
306+
if node.expression.op.order_independent and window_spec.is_unbounded:
307+
# notably percentile_cont does not support ordering clause
308+
window_spec = window_spec.without_order()
309+
310+
window_op = aggregate_compiler.compile_analytic(node.expression, window_spec)
311+
312+
inputs: tuple[sge.Expression, ...] = tuple(
313+
scalar_compiler.compile_scalar_expression(expression.DerefOp(column))
314+
for column in node.expression.column_references
315+
)
316+
317+
clauses: list[tuple[sge.Expression, sge.Expression]] = []
318+
if node.expression.op.skips_nulls and not node.never_skip_nulls:
319+
for column in inputs:
320+
clauses.append((sge.Is(this=column, expression=sge.Null()), sge.Null()))
321+
322+
if window_spec.min_periods and len(inputs) > 0:
323+
if node.expression.op.skips_nulls:
324+
# Most operations do not count NULL values towards min_periods
325+
not_null_columns = [
326+
sge.Not(this=sge.Is(this=column, expression=sge.Null()))
327+
for column in inputs
328+
]
329+
# All inputs must be non-null for observation to count
330+
if not not_null_columns:
331+
is_observation_expr: sge.Expression = sge.convert(True)
332+
else:
333+
is_observation_expr = not_null_columns[0]
334+
for expr in not_null_columns[1:]:
335+
is_observation_expr = sge.And(
336+
this=is_observation_expr, expression=expr
337+
)
338+
is_observation = ir._cast(is_observation_expr, "INT64")
339+
observation_count = windows.apply_window_if_present(
340+
sge.func("SUM", is_observation), window_spec
341+
)
342+
else:
343+
# Operations like count treat even NULLs as valid observations
344+
# for the sake of min_periods notnull is just used to convert
345+
# null values to non-null (FALSE) values to be counted.
346+
is_observation = ir._cast(
347+
sge.Not(this=sge.Is(this=inputs[0], expression=sge.Null())),
348+
"INT64",
349+
)
350+
observation_count = windows.apply_window_if_present(
351+
sge.func("COUNT", is_observation), window_spec
352+
)
353+
354+
clauses.append(
355+
(
356+
observation_count < sge.convert(window_spec.min_periods),
357+
sge.Null(),
358+
)
359+
)
360+
if clauses:
361+
when_expressions = [sge.When(this=cond, true=res) for cond, res in clauses]
362+
window_op = sge.Case(ifs=when_expressions, default=window_op)
363+
364+
# TODO: check if we can directly window the expression.
365+
return child.window(
366+
window_op=window_op,
367+
output_column_id=node.output_name.sql,
368+
)
369+
301370

302371
def _replace_unsupported_ops(node: nodes.BigFrameNode):
303372
node = nodes.bottom_up(node, rewrite.rewrite_slice)

bigframes/core/compile/sqlglot/expressions/binary_compiler.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,51 @@ def _(op, left: TypedExpr, right: TypedExpr) -> sge.Expression:
7373
)
7474

7575

76+
@BINARY_OP_REGISTRATION.register(ops.div_op)
77+
def _(op, left: TypedExpr, right: TypedExpr) -> sge.Expression:
78+
left_expr = left.expr
79+
if left.dtype == dtypes.BOOL_DTYPE:
80+
left_expr = sge.Cast(this=left_expr, to="INT64")
81+
right_expr = right.expr
82+
if right.dtype == dtypes.BOOL_DTYPE:
83+
right_expr = sge.Cast(this=right_expr, to="INT64")
84+
85+
result = sge.func("IEEE_DIVIDE", left_expr, right_expr)
86+
if left.dtype == dtypes.TIMEDELTA_DTYPE and dtypes.is_numeric(right.dtype):
87+
return sge.Cast(this=sge.Floor(this=result), to="INT64")
88+
else:
89+
return result
90+
91+
92+
@BINARY_OP_REGISTRATION.register(ops.ge_op)
93+
def _(op, left: TypedExpr, right: TypedExpr) -> sge.Expression:
94+
return sge.GTE(this=left.expr, expression=right.expr)
95+
96+
97+
@BINARY_OP_REGISTRATION.register(ops.JSONSet)
98+
def _(op, left: TypedExpr, right: TypedExpr) -> sge.Expression:
99+
return sge.func("JSON_SET", left.expr, sge.convert(op.json_path), right.expr)
100+
101+
102+
@BINARY_OP_REGISTRATION.register(ops.mul_op)
103+
def _(op, left: TypedExpr, right: TypedExpr) -> sge.Expression:
104+
left_expr = left.expr
105+
if left.dtype == dtypes.BOOL_DTYPE:
106+
left_expr = sge.Cast(this=left_expr, to="INT64")
107+
right_expr = right.expr
108+
if right.dtype == dtypes.BOOL_DTYPE:
109+
right_expr = sge.Cast(this=right_expr, to="INT64")
110+
111+
result = sge.Mul(this=left_expr, expression=right_expr)
112+
113+
if (dtypes.is_numeric(left.dtype) and right.dtype == dtypes.TIMEDELTA_DTYPE) or (
114+
left.dtype == dtypes.TIMEDELTA_DTYPE and dtypes.is_numeric(right.dtype)
115+
):
116+
return sge.Cast(this=sge.Floor(this=result), to="INT64")
117+
else:
118+
return result
119+
120+
76121
@BINARY_OP_REGISTRATION.register(ops.sub_op)
77122
def _(op, left: TypedExpr, right: TypedExpr) -> sge.Expression:
78123
if dtypes.is_numeric(left.dtype) and dtypes.is_numeric(right.dtype):
@@ -115,11 +160,6 @@ def _(op, left: TypedExpr, right: TypedExpr) -> sge.Expression:
115160
)
116161

117162

118-
@BINARY_OP_REGISTRATION.register(ops.ge_op)
163+
@BINARY_OP_REGISTRATION.register(ops.obj_make_ref_op)
119164
def _(op, left: TypedExpr, right: TypedExpr) -> sge.Expression:
120-
return sge.GTE(this=left.expr, expression=right.expr)
121-
122-
123-
@BINARY_OP_REGISTRATION.register(ops.JSONSet)
124-
def _(op, left: TypedExpr, right: TypedExpr) -> sge.Expression:
125-
return sge.func("JSON_SET", left.expr, sge.convert(op.json_path), right.expr)
165+
return sge.func("OBJ.MAKE_REF", left.expr, right.expr)

0 commit comments

Comments
 (0)