Skip to content

Commit 7bcdffd

Browse files
authored
Merge branch 'main' into main_chelsealin_add
2 parents 5229ec5 + e9bda37 commit 7bcdffd

File tree

19 files changed

+432
-30
lines changed

19 files changed

+432
-30
lines changed

bigframes/core/compile/sqlglot/expressions/unary_compiler.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
import typing
1818

19+
import pandas as pd
20+
import pyarrow as pa
1921
import sqlglot
2022
import sqlglot.expressions as sge
2123

@@ -105,6 +107,12 @@ def _(op: ops.base_ops.UnaryOp, expr: TypedExpr) -> sge.Expression:
105107
)
106108

107109

110+
@UNARY_OP_REGISTRATION.register(ops.AsTypeOp)
111+
def _(op: ops.AsTypeOp, expr: TypedExpr) -> sge.Expression:
112+
# TODO: Support more types for casting, such as JSON, etc.
113+
return sge.Cast(this=expr.expr, to=op.to_type)
114+
115+
108116
@UNARY_OP_REGISTRATION.register(ops.ArrayToStringOp)
109117
def _(op: ops.ArrayToStringOp, expr: TypedExpr) -> sge.Expression:
110118
return sge.ArrayToString(this=expr.expr, expression=f"'{op.delimiter}'")
@@ -234,6 +242,12 @@ def _(op: ops.base_ops.UnaryOp, expr: TypedExpr) -> sge.Expression:
234242
) - sge.convert(1)
235243

236244

245+
@UNARY_OP_REGISTRATION.register(ops.FloorDtOp)
246+
def _(op: ops.FloorDtOp, expr: TypedExpr) -> sge.Expression:
247+
# TODO: Remove this method when it is covered by ops.FloorOp
248+
return sge.TimestampTrunc(this=expr.expr, unit=sge.Identifier(this=op.freq))
249+
250+
237251
@UNARY_OP_REGISTRATION.register(ops.floor_op)
238252
def _(op: ops.base_ops.UnaryOp, expr: TypedExpr) -> sge.Expression:
239253
return sge.Floor(this=expr.expr)
@@ -249,6 +263,26 @@ def _(op: ops.base_ops.UnaryOp, expr: TypedExpr) -> sge.Expression:
249263
return sge.func("ST_ASTEXT", expr.expr)
250264

251265

266+
@UNARY_OP_REGISTRATION.register(ops.geo_st_boundary_op)
267+
def _(op: ops.base_ops.UnaryOp, expr: TypedExpr) -> sge.Expression:
268+
return sge.func("ST_BOUNDARY", expr.expr)
269+
270+
271+
@UNARY_OP_REGISTRATION.register(ops.geo_st_geogfromtext_op)
272+
def _(op: ops.base_ops.UnaryOp, expr: TypedExpr) -> sge.Expression:
273+
return sge.func("SAFE.ST_GEOGFROMTEXT", expr.expr)
274+
275+
276+
@UNARY_OP_REGISTRATION.register(ops.geo_st_isclosed_op)
277+
def _(op: ops.base_ops.UnaryOp, expr: TypedExpr) -> sge.Expression:
278+
return sge.func("ST_ISCLOSED", expr.expr)
279+
280+
281+
@UNARY_OP_REGISTRATION.register(ops.GeoStLengthOp)
282+
def _(op: ops.GeoStLengthOp, expr: TypedExpr) -> sge.Expression:
283+
return sge.func("ST_LENGTH", expr.expr)
284+
285+
252286
@UNARY_OP_REGISTRATION.register(ops.geo_x_op)
253287
def _(op: ops.base_ops.UnaryOp, expr: TypedExpr) -> sge.Expression:
254288
return sge.func("SAFE.ST_X", expr.expr)
@@ -274,6 +308,11 @@ def _(op: ops.base_ops.UnaryOp, expr: TypedExpr) -> sge.Expression:
274308
return sge.BitwiseNot(this=expr.expr)
275309

276310

311+
@UNARY_OP_REGISTRATION.register(ops.IsInOp)
312+
def _(op: ops.IsInOp, expr: TypedExpr) -> sge.Expression:
313+
return sge.In(this=expr.expr, expressions=[sge.convert(v) for v in op.values])
314+
315+
277316
@UNARY_OP_REGISTRATION.register(ops.isalnum_op)
278317
def _(op: ops.base_ops.UnaryOp, expr: TypedExpr) -> sge.Expression:
279318
return sge.RegexpLike(this=expr.expr, expression=sge.convert(r"^(\p{N}|\p{L})+$"))
@@ -517,6 +556,26 @@ def _(op: ops.StrSliceOp, expr: TypedExpr) -> sge.Expression:
517556
)
518557

519558

559+
@UNARY_OP_REGISTRATION.register(ops.StrftimeOp)
560+
def _(op: ops.StrftimeOp, expr: TypedExpr) -> sge.Expression:
561+
return sge.func("FORMAT_TIMESTAMP", sge.convert(op.date_format), expr.expr)
562+
563+
564+
@UNARY_OP_REGISTRATION.register(ops.StructFieldOp)
565+
def _(op: ops.StructFieldOp, expr: TypedExpr) -> sge.Expression:
566+
if isinstance(op.name_or_index, str):
567+
name = op.name_or_index
568+
else:
569+
pa_type = typing.cast(pd.ArrowDtype, expr.dtype)
570+
pa_struct_type = typing.cast(pa.StructType, pa_type.pyarrow_dtype)
571+
name = pa_struct_type.field(op.name_or_index).name
572+
573+
return sge.Column(
574+
this=sge.to_identifier(name, quoted=True),
575+
catalog=expr.expr,
576+
)
577+
578+
520579
@UNARY_OP_REGISTRATION.register(ops.tan_op)
521580
def _(op: ops.base_ops.UnaryOp, expr: TypedExpr) -> sge.Expression:
522581
return sge.func("TAN", expr.expr)
@@ -537,6 +596,36 @@ def _(op: ops.base_ops.UnaryOp, expr: TypedExpr) -> sge.Expression:
537596
return sge.Floor(this=expr.expr)
538597

539598

599+
@UNARY_OP_REGISTRATION.register(ops.ToDatetimeOp)
600+
def _(op: ops.ToDatetimeOp, expr: TypedExpr) -> sge.Expression:
601+
return sge.Cast(this=sge.func("TIMESTAMP_SECONDS", expr.expr), to="DATETIME")
602+
603+
604+
@UNARY_OP_REGISTRATION.register(ops.ToTimestampOp)
605+
def _(op: ops.ToTimestampOp, expr: TypedExpr) -> sge.Expression:
606+
return sge.func("TIMESTAMP_SECONDS", expr.expr)
607+
608+
609+
@UNARY_OP_REGISTRATION.register(ops.ToTimedeltaOp)
610+
def _(op: ops.ToTimedeltaOp, expr: TypedExpr) -> sge.Expression:
611+
return sge.Interval(this=expr.expr, unit=sge.Identifier(this="SECOND"))
612+
613+
614+
@UNARY_OP_REGISTRATION.register(ops.UnixMicros)
615+
def _(op: ops.UnixMicros, expr: TypedExpr) -> sge.Expression:
616+
return sge.func("UNIX_MICROS", expr.expr)
617+
618+
619+
@UNARY_OP_REGISTRATION.register(ops.UnixMillis)
620+
def _(op: ops.UnixMillis, expr: TypedExpr) -> sge.Expression:
621+
return sge.func("UNIX_MILLIS", expr.expr)
622+
623+
624+
@UNARY_OP_REGISTRATION.register(ops.UnixSeconds)
625+
def _(op: ops.UnixSeconds, expr: TypedExpr) -> sge.Expression:
626+
return sge.func("UNIX_SECONDS", expr.expr)
627+
628+
540629
# JSON Ops
541630
@UNARY_OP_REGISTRATION.register(ops.JSONExtract)
542631
def _(op: ops.JSONExtract, expr: TypedExpr) -> sge.Expression:

bigframes/pandas/io/api.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import functools
1818
import inspect
19+
import os
1920
import threading
2021
import typing
2122
from typing import (
@@ -56,6 +57,7 @@
5657
from bigframes.session import dry_runs
5758
import bigframes.session._io.bigquery
5859
import bigframes.session.clients
60+
import bigframes.session.metrics
5961

6062
# Note: the following methods are duplicated from Session. This duplication
6163
# enables the following:
@@ -625,6 +627,11 @@ def _get_bqclient() -> bigquery.Client:
625627

626628
def _dry_run(query, bqclient) -> bigquery.QueryJob:
627629
job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True))
630+
631+
# Fix for b/435183833. Log metrics even if a Session isn't available.
632+
if bigframes.session.metrics.LOGGING_NAME_ENV_VAR in os.environ:
633+
metrics = bigframes.session.metrics.ExecutionMetrics()
634+
metrics.count_job_stats(job)
628635
return job
629636

630637

bigframes/session/metrics.py

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -40,32 +40,54 @@ def count_job_stats(
4040
):
4141
if query_job is None:
4242
assert row_iterator is not None
43-
total_bytes_processed = getattr(row_iterator, "total_bytes_processed", None)
44-
query = getattr(row_iterator, "query", None)
45-
if total_bytes_processed is None or query is None:
46-
return
43+
44+
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
45+
bytes_processed = getattr(row_iterator, "total_bytes_processed", 0)
46+
query_char_count = len(getattr(row_iterator, "query", ""))
47+
slot_millis = getattr(row_iterator, "slot_millis", 0)
48+
exec_seconds = 0.0
4749

4850
self.execution_count += 1
49-
self.query_char_count += len(query)
50-
self.bytes_processed += total_bytes_processed
51-
write_stats_to_disk(len(query), total_bytes_processed)
52-
return
51+
self.query_char_count += query_char_count
52+
self.bytes_processed += bytes_processed
53+
self.slot_millis += slot_millis
54+
55+
elif query_job.configuration.dry_run:
56+
query_char_count = len(query_job.query)
5357

54-
if query_job.configuration.dry_run:
55-
write_stats_to_disk(len(query_job.query), 0, 0, 0)
58+
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
59+
bytes_processed = 0
60+
slot_millis = 0
61+
exec_seconds = 0.0
5662

57-
stats = get_performance_stats(query_job)
58-
if stats is not None:
59-
query_char_count, bytes_processed, slot_millis, execution_secs = stats
63+
elif (stats := get_performance_stats(query_job)) is not None:
64+
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
6065
self.execution_count += 1
6166
self.query_char_count += query_char_count
6267
self.bytes_processed += bytes_processed
6368
self.slot_millis += slot_millis
64-
self.execution_secs += execution_secs
69+
self.execution_secs += exec_seconds
6570
write_stats_to_disk(
66-
query_char_count, bytes_processed, slot_millis, execution_secs
71+
query_char_count=query_char_count,
72+
bytes_processed=bytes_processed,
73+
slot_millis=slot_millis,
74+
exec_seconds=exec_seconds,
6775
)
6876

77+
else:
78+
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
79+
bytes_processed = 0
80+
query_char_count = 0
81+
slot_millis = 0
82+
exec_seconds = 0
83+
84+
write_stats_to_disk(
85+
query_char_count=query_char_count,
86+
bytes_processed=bytes_processed,
87+
slot_millis=slot_millis,
88+
exec_seconds=exec_seconds,
89+
)
90+
6991

7092
def get_performance_stats(
7193
query_job: bigquery.QueryJob,
@@ -103,10 +125,11 @@ def get_performance_stats(
103125

104126

105127
def write_stats_to_disk(
128+
*,
106129
query_char_count: int,
107130
bytes_processed: int,
108-
slot_millis: Optional[int] = None,
109-
exec_seconds: Optional[float] = None,
131+
slot_millis: int,
132+
exec_seconds: float,
110133
):
111134
"""For pytest runs only, log information about the query job
112135
to a file in order to create a performance report.
@@ -118,18 +141,17 @@ def write_stats_to_disk(
118141
test_name = os.environ[LOGGING_NAME_ENV_VAR]
119142
current_directory = os.getcwd()
120143

121-
if (slot_millis is not None) and (exec_seconds is not None):
122-
# store slot milliseconds
123-
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
124-
with open(slot_file, "a") as f:
125-
f.write(str(slot_millis) + "\n")
144+
# store slot milliseconds
145+
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
146+
with open(slot_file, "a") as f:
147+
f.write(str(slot_millis) + "\n")
126148

127-
# store execution time seconds
128-
exec_time_file = os.path.join(
129-
current_directory, test_name + ".bq_exec_time_seconds"
130-
)
131-
with open(exec_time_file, "a") as f:
132-
f.write(str(exec_seconds) + "\n")
149+
# store execution time seconds
150+
exec_time_file = os.path.join(
151+
current_directory, test_name + ".bq_exec_time_seconds"
152+
)
153+
with open(exec_time_file, "a") as f:
154+
f.write(str(exec_seconds) + "\n")
133155

134156
# store length of query
135157
query_char_count_file = os.path.join(

tests/benchmark/read_gbq_colab/aggregate_output.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def aggregate_output(*, project_id, dataset_id, table_id):
4848

4949

5050
if __name__ == "__main__":
51-
config = utils.get_configuration(include_table_id=True)
51+
config = utils.get_configuration(include_table_id=True, start_session=False)
5252
current_path = pathlib.Path(__file__).absolute()
5353

5454
utils.get_execution_time(
@@ -58,5 +58,4 @@ def aggregate_output(*, project_id, dataset_id, table_id):
5858
project_id=config.project_id,
5959
dataset_id=config.dataset_id,
6060
table_id=config.table_id,
61-
session=config.session,
6261
)
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
WITH `bfcte_0` AS (
2+
SELECT
3+
`timestamp_col` AS `bfcol_0`
4+
FROM `bigframes-dev`.`sqlglot_test`.`scalar_types`
5+
), `bfcte_1` AS (
6+
SELECT
7+
*,
8+
TIMESTAMP_TRUNC(`bfcol_0`, DAY) AS `bfcol_1`
9+
FROM `bfcte_0`
10+
)
11+
SELECT
12+
`bfcol_1` AS `timestamp_col`
13+
FROM `bfcte_1`
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
WITH `bfcte_0` AS (
2+
SELECT
3+
`geography_col` AS `bfcol_0`
4+
FROM `bigframes-dev`.`sqlglot_test`.`scalar_types`
5+
), `bfcte_1` AS (
6+
SELECT
7+
*,
8+
ST_BOUNDARY(`bfcol_0`) AS `bfcol_1`
9+
FROM `bfcte_0`
10+
)
11+
SELECT
12+
`bfcol_1` AS `geography_col`
13+
FROM `bfcte_1`
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
WITH `bfcte_0` AS (
2+
SELECT
3+
`string_col` AS `bfcol_0`
4+
FROM `bigframes-dev`.`sqlglot_test`.`scalar_types`
5+
), `bfcte_1` AS (
6+
SELECT
7+
*,
8+
SAFE.ST_GEOGFROMTEXT(`bfcol_0`) AS `bfcol_1`
9+
FROM `bfcte_0`
10+
)
11+
SELECT
12+
`bfcol_1` AS `string_col`
13+
FROM `bfcte_1`
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
WITH `bfcte_0` AS (
2+
SELECT
3+
`geography_col` AS `bfcol_0`
4+
FROM `bigframes-dev`.`sqlglot_test`.`scalar_types`
5+
), `bfcte_1` AS (
6+
SELECT
7+
*,
8+
ST_ISCLOSED(`bfcol_0`) AS `bfcol_1`
9+
FROM `bfcte_0`
10+
)
11+
SELECT
12+
`bfcol_1` AS `geography_col`
13+
FROM `bfcte_1`
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
WITH `bfcte_0` AS (
2+
SELECT
3+
`geography_col` AS `bfcol_0`
4+
FROM `bigframes-dev`.`sqlglot_test`.`scalar_types`
5+
), `bfcte_1` AS (
6+
SELECT
7+
*,
8+
ST_LENGTH(`bfcol_0`) AS `bfcol_1`
9+
FROM `bfcte_0`
10+
)
11+
SELECT
12+
`bfcol_1` AS `geography_col`
13+
FROM `bfcte_1`
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
WITH `bfcte_0` AS (
2+
SELECT
3+
`int64_col` AS `bfcol_0`
4+
FROM `bigframes-dev`.`sqlglot_test`.`scalar_types`
5+
), `bfcte_1` AS (
6+
SELECT
7+
*,
8+
`bfcol_0` IN (1, 2, 3) AS `bfcol_1`
9+
FROM `bfcte_0`
10+
)
11+
SELECT
12+
`bfcol_1` AS `int64_col`
13+
FROM `bfcte_1`

0 commit comments

Comments
 (0)