diff --git a/sqlmesh/core/engine_adapter/bigquery.py b/sqlmesh/core/engine_adapter/bigquery.py index 00b33f67a5..0dfa2325e8 100644 --- a/sqlmesh/core/engine_adapter/bigquery.py +++ b/sqlmesh/core/engine_adapter/bigquery.py @@ -169,17 +169,18 @@ def _df_to_source_queries( ) def query_factory() -> Query: - if bigframes_pd and isinstance(df, bigframes_pd.DataFrame): - df.to_gbq( + ordered_df = df[list(source_columns_to_types)] + if bigframes_pd and isinstance(ordered_df, bigframes_pd.DataFrame): + ordered_df.to_gbq( f"{temp_bq_table.project}.{temp_bq_table.dataset_id}.{temp_bq_table.table_id}", if_exists="replace", ) elif not self.table_exists(temp_table): # Make mypy happy - assert isinstance(df, pd.DataFrame) + assert isinstance(ordered_df, pd.DataFrame) self._db_call(self.client.create_table, table=temp_bq_table, exists_ok=False) result = self.__load_pandas_to_table( - temp_bq_table, df, source_columns_to_types, replace=False + temp_bq_table, ordered_df, source_columns_to_types, replace=False ) if result.errors: raise SQLMeshError(result.errors) diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index 355fb9719c..9c27b45115 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -378,6 +378,8 @@ def query_factory() -> Query: elif isinstance(df, pd.DataFrame): from snowflake.connector.pandas_tools import write_pandas + ordered_df = df[list(source_columns_to_types)] + # Workaround for https://github.com/snowflakedb/snowflake-connector-python/issues/1034 # The above issue has already been fixed upstream, but we keep the following # line anyway in order to support a wider range of Snowflake versions. @@ -388,16 +390,16 @@ def query_factory() -> Query: # See: https://stackoverflow.com/a/75627721 for column, kind in source_columns_to_types.items(): - if is_datetime64_any_dtype(df.dtypes[column]): + if is_datetime64_any_dtype(ordered_df.dtypes[column]): if kind.is_type("date"): # type: ignore - df[column] = pd.to_datetime(df[column]).dt.date # type: ignore - elif getattr(df.dtypes[column], "tz", None) is not None: # type: ignore - df[column] = pd.to_datetime(df[column]).dt.strftime( + ordered_df[column] = pd.to_datetime(ordered_df[column]).dt.date # type: ignore + elif getattr(ordered_df.dtypes[column], "tz", None) is not None: # type: ignore + ordered_df[column] = pd.to_datetime(ordered_df[column]).dt.strftime( "%Y-%m-%d %H:%M:%S.%f%z" ) # type: ignore # https://github.com/snowflakedb/snowflake-connector-python/issues/1677 else: # type: ignore - df[column] = pd.to_datetime(df[column]).dt.strftime( + ordered_df[column] = pd.to_datetime(ordered_df[column]).dt.strftime( "%Y-%m-%d %H:%M:%S.%f" ) # type: ignore @@ -407,7 +409,7 @@ def query_factory() -> Query: write_pandas( self._connection_pool.get(), - df, + ordered_df, temp_table.name, schema=temp_table.db or None, database=database.sql(dialect=self.dialect) if database else None, diff --git a/tests/core/engine_adapter/test_bigquery.py b/tests/core/engine_adapter/test_bigquery.py index 4328fa8923..f195bbaa2a 100644 --- a/tests/core/engine_adapter/test_bigquery.py +++ b/tests/core/engine_adapter/test_bigquery.py @@ -487,7 +487,13 @@ def temp_table_exists(table: exp.Table) -> bool: retry_resp_call.errors = None retry_mock.return_value = retry_resp db_call_mock.return_value = AttributeDict({"errors": None}) - df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) + df = pd.DataFrame( + { + "id": [1, 2, 3], + "ts": ["2025-01-01 00:00:00", "2025-01-01 00:00:00", "2025-01-01 00:00:00"], + "val": [7, 8, 9], + } + ) adapter.merge( target_table="target", source_table=df,