2727import functools
2828import itertools
2929import random
30- import textwrap
3130import typing
3231from typing import (
3332 Iterable ,
5554from bigframes .core import agg_expressions , local_data
5655import bigframes .core as core
5756import bigframes .core .agg_expressions as ex_types
58- import bigframes .core .compile .googlesql as googlesql
5957import bigframes .core .expression as ex
6058import bigframes .core .expression as scalars
6159import bigframes .core .guid as guid
6260import bigframes .core .identifiers
6361import bigframes .core .join_def as join_defs
6462import bigframes .core .ordering as ordering
6563import bigframes .core .pyarrow_utils as pyarrow_utils
66- import bigframes .core .schema as bf_schema
67- import bigframes .core .sql as sql
6864import bigframes .core .utils as utils
6965import bigframes .core .window_spec as windows
7066import bigframes .dtypes
@@ -2779,14 +2775,6 @@ def _throw_if_null_index(self, opname: str):
27792775 )
27802776
27812777 def _get_rows_as_json_values (self ) -> Block :
2782- # We want to preserve any ordering currently present before turning to
2783- # direct SQL manipulation. We will restore the ordering when we rebuild
2784- # expression.
2785- # TODO(shobs): Replace direct SQL manipulation by structured expression
2786- # manipulation
2787- expr , ordering_column_name = self .expr .promote_offsets ()
2788- expr_sql = self .session ._executor .to_sql (expr )
2789-
27902778 # Names of the columns to serialize for the row.
27912779 # We will use the repr-eval pattern to serialize a value here and
27922780 # deserialize in the cloud function. Let's make sure that would work.
@@ -2802,93 +2790,44 @@ def _get_rows_as_json_values(self) -> Block:
28022790 )
28032791
28042792 column_names .append (serialized_column_name )
2805- column_names_csv = sql .csv (map (sql .simple_literal , column_names ))
2806-
2807- # index columns count
2808- index_columns_count = len (self .index_columns )
28092793
28102794 # column references to form the array of values for the row
28112795 column_types = list (self .index .dtypes ) + list (self .dtypes )
28122796 column_references = []
28132797 for type_ , col in zip (column_types , self .expr .column_ids ):
2814- if isinstance (type_ , pd .ArrowDtype ) and pa .types .is_binary (
2815- type_ .pyarrow_dtype
2816- ):
2817- column_references .append (sql .to_json_string (col ))
2798+ if type_ == bigframes .dtypes .BYTES_DTYPE :
2799+ column_references .append (ops .ToJSONString ().as_expr (col ))
2800+ elif type_ == bigframes .dtypes .BOOL_DTYPE :
2801+ # cast operator produces True/False, but function template expects lower case
2802+ column_references .append (
2803+ ops .lower_op .as_expr (
2804+ ops .AsTypeOp (bigframes .dtypes .STRING_DTYPE ).as_expr (col )
2805+ )
2806+ )
28182807 else :
2819- column_references .append (sql .cast_as_string (col ))
2820-
2821- column_references_csv = sql .csv (column_references )
2822-
2823- # types of the columns to serialize for the row
2824- column_types_csv = sql .csv (
2825- [sql .simple_literal (str (typ )) for typ in column_types ]
2826- )
2808+ column_references .append (
2809+ ops .AsTypeOp (bigframes .dtypes .STRING_DTYPE ).as_expr (col )
2810+ )
28272811
28282812 # row dtype to use for deserializing the row as pandas series
28292813 pandas_row_dtype = bigframes .dtypes .lcd_type (* column_types )
28302814 if pandas_row_dtype is None :
28312815 pandas_row_dtype = "object"
2832- pandas_row_dtype = sql .simple_literal (str (pandas_row_dtype ))
2833-
2834- # create a json column representing row through SQL manipulation
2835- row_json_column_name = guid .generate_guid ()
2836- select_columns = (
2837- [ordering_column_name ] + list (self .index_columns ) + [row_json_column_name ]
2838- )
2839- select_columns_csv = sql .csv (
2840- [googlesql .identifier (col ) for col in select_columns ]
2841- )
2842- json_sql = f"""\
2843- With T0 AS (
2844- { textwrap .indent (expr_sql , " " )}
2845- ),
2846- T1 AS (
2847- SELECT *,
2848- TO_JSON_STRING(JSON_OBJECT(
2849- "names", [{ column_names_csv } ],
2850- "types", [{ column_types_csv } ],
2851- "values", [{ column_references_csv } ],
2852- "indexlength", { index_columns_count } ,
2853- "dtype", { pandas_row_dtype }
2854- )) AS { googlesql .identifier (row_json_column_name )} FROM T0
2855- )
2856- SELECT { select_columns_csv } FROM T1
2857- """
2858- # The only ways this code is used is through df.apply(axis=1) cope path
2859- destination , query_job = self .session ._loader ._query_to_destination (
2860- json_sql , cluster_candidates = [ordering_column_name ]
2861- )
2862- if not destination :
2863- raise ValueError (f"Query job { query_job } did not produce result table" )
2864-
2865- new_schema = (
2866- self .expr .schema .select ([* self .index_columns ])
2867- .append (
2868- bf_schema .SchemaItem (
2869- row_json_column_name , bigframes .dtypes .STRING_DTYPE
2870- )
2871- )
2872- .append (
2873- bf_schema .SchemaItem (ordering_column_name , bigframes .dtypes .INT_DTYPE )
2874- )
2875- )
2816+ pandas_row_dtype = str (pandas_row_dtype )
28762817
2877- dest_table = self .session .bqclient .get_table (destination )
2878- expr = core .ArrayValue .from_table (
2879- dest_table ,
2880- schema = new_schema ,
2881- session = self .session ,
2882- offsets_col = ordering_column_name ,
2883- n_rows = dest_table .num_rows ,
2884- ).drop_columns ([ordering_column_name ])
2885- block = Block (
2886- expr ,
2887- index_columns = self .index_columns ,
2888- column_labels = [row_json_column_name ],
2889- index_labels = self ._index_labels ,
2818+ struct_op = ops .StructOp (
2819+ column_names = ("names" , "types" , "values" , "indexlength" , "dtype" )
28902820 )
2891- return block
2821+ names_val = ex .const (tuple (column_names ))
2822+ types_val = ex .const (tuple (map (str , column_types )))
2823+ values_val = ops .ToArrayOp ().as_expr (* column_references )
2824+ indexlength_val = ex .const (len (self .index_columns ))
2825+ dtype_val = ex .const (str (pandas_row_dtype ))
2826+ struct_expr = struct_op .as_expr (
2827+ names_val , types_val , values_val , indexlength_val , dtype_val
2828+ )
2829+ block , col_id = self .project_expr (ops .ToJSONString ().as_expr (struct_expr ))
2830+ return block .select_column (col_id )
28922831
28932832
28942833class BlockIndexProperties :
0 commit comments