Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions samples/dbt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ It includes basic configurations and sample models to help you get started quick
- `dbt_project.yml`: configures your dbt project - **dbt_sample_project**.
- `dbt_bigframes_code_sample_1.py`: An example to read BigQuery data and perform basic transformation.
- `dbt_bigframes_code_sample_2.py`: An example to build an incremental model that leverages BigFrames UDF capabilities.
- `prepare_table.py`: An ML example to consolidate various data sources into a single, unified table for later usage.
- `prediction.py`: An ML example to train models and then generate predictions using the prepared table.

## Requirements

Expand Down
67 changes: 67 additions & 0 deletions samples/dbt/dbt_sample_project/models/ml_example/prediction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This DBT Python model prepares and trains a machine learning model to predict
Comment thread
jialuoo marked this conversation as resolved.
# ozone levels.
# 1. Data Preparation: The model first gets a prepared dataset and splits it
# into three subsets based on the year: training data (before 2017),
# testing data (2017-2019), and prediction data (2020 and later).
# 2. Model Training: It then uses the LinearRegression model from BigFrames
# ML library. The model is trained on the historical data, using other
# atmospheric parameters to predict the 'o3' (ozone) levels.
# 3. Prediction: Finally, the trained model makes predictions on the most
# recent data (from 2020 onwards) and returns the resulting DataFrame of
# predicted ozone values.
#
# See more details from the related blog post: https://docs.getdbt.com/blog/train-linear-dbt-bigframes
Comment thread
jialuoo marked this conversation as resolved.


def model(dbt, session):
dbt.config(submission_method="bigframes", timeout=6000)

df = dbt.ref("prepare_table")

# Define the rules for separating the training, test and prediction data.
train_data_filter = (df.date_local.dt.year < 2017)
test_data_filter = (
(df.date_local.dt.year >= 2017) & (df.date_local.dt.year < 2020)
)
predict_data_filter = (df.date_local.dt.year >= 2020)

# Define index_columns again here in prediction.
index_columns = ["state_name", "county_name", "site_num", "date_local", "time_local"]

# Separate the training, test and prediction data.
df_train = df[train_data_filter].set_index(index_columns)
df_test = df[test_data_filter].set_index(index_columns)
df_predict = df[predict_data_filter].set_index(index_columns)

# Finalize the training dataframe.
X_train = df_train.drop(columns="o3")
y_train = df_train["o3"]

# Finalize the prediction dataframe.
X_predict = df_predict.drop(columns="o3")

# Import the LinearRegression model from bigframes.ml module.
from bigframes.ml.linear_model import LinearRegression

# Train the model.
model = LinearRegression()
model.fit(X_train, y_train)

# Make the prediction using the model.
df_pred = model.predict(X_predict)

return df_pred
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This DBT Python model processes EPA historical air quality data from BigQuery
# using BigFrames. The primary goal is to merge several hourly summary
# tables into a single, unified DataFrame for later prediction. It includes the
# following steps:
# 1. Reading and Cleaning: It reads individual hourly summary tables from
# BigQuery for various atmospheric parameters (like CO, O3, temperature,
# and wind speed). Each table is cleaned by sorting, removing duplicates,
# and renaming columns for clarity.
# 2. Combining Data: It then merges these cleaned tables into a single,
# comprehensive DataFrame. An inner join is used to ensure the final output
# only includes records with complete data across all parameters.
# 3. Final Output: The unified DataFrame is returned as the model's output,
# creating a corresponding BigQuery table for future use.
#
# See more details from the related blog post: https://docs.getdbt.com/blog/train-linear-dbt-bigframes


import bigframes.pandas as bpd

def model(dbt, session):
# Optional: override settings from dbt_project.yml.
# When both are set, dbt.config takes precedence over dbt_project.yml.
dbt.config(submission_method="bigframes", timeout=6000)

# Define the dataset and the columns of interest representing various parameters
# in the atmosphere.
dataset = "bigquery-public-data.epa_historical_air_quality"
index_columns = ["state_name", "county_name", "site_num", "date_local", "time_local"]
param_column = "parameter_name"
value_column = "sample_measurement"

# Initialize a list for collecting dataframes from individual parameters.
params_dfs = []

# Collect dataframes from tables which contain data for single parameter.
table_param_dict = {
"co_hourly_summary" : "co",
"no2_hourly_summary" : "no2",
"o3_hourly_summary" : "o3",
"pressure_hourly_summary" : "pressure",
"so2_hourly_summary" : "so2",
"temperature_hourly_summary" : "temperature",
}

for table, param in table_param_dict.items():
param_df = bpd.read_gbq(
f"{dataset}.{table}",
columns=index_columns + [value_column]
)
param_df = param_df\
.sort_values(index_columns)\
.drop_duplicates(index_columns)\
.set_index(index_columns)\
.rename(columns={value_column : param})
params_dfs.append(param_df)

# Collect dataframes from the table containing wind speed.
# Optionally: collect dataframes from other tables containing
# wind direction, NO, NOx, and NOy data as needed.
wind_table = f"{dataset}.wind_hourly_summary"
bpd.read_gbq(wind_table, columns=[param_column]).value_counts()

wind_speed_df = bpd.read_gbq(
wind_table,
columns=index_columns + [value_column],
filters=[(param_column, "==", "Wind Speed - Resultant")]
)
wind_speed_df = wind_speed_df\
.sort_values(index_columns)\
.drop_duplicates(index_columns)\
.set_index(index_columns)\
.rename(columns={value_column: "wind_speed"})
params_dfs.append(wind_speed_df)

# Combine data for all the selected parameters.
df = bpd.concat(params_dfs, axis=1, join="inner").cache()
Comment thread
jialuoo marked this conversation as resolved.
Outdated
df = df.reset_index()

return df