Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
13 changes: 13 additions & 0 deletions dbt_bigframes_integration/.dbt.yml
Comment thread
jialuoo marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
dbt_sample_project:
outputs:
dev: # The target environment name (e.g., dev, prod)
compute_region: us-central1 # Region used for compute operations
dataset: dbt_sample_dateset # BigQuery dataset where dbt will create models
gcs_bucket: dbt_sample_bucket # GCS bucket to store output files
location: US # BigQuery dataset location
method: oauth # Authentication method
priority: interactive # Job priority: "interactive" or "batch"
project: bigframes-dev # GCP project ID
threads: 1 # Number of threads dbt can use for running models in parallel
type: bigquery # Specifies the dbt adapter
target: dev # The default target environment
39 changes: 39 additions & 0 deletions dbt_bigframes_integration/dbt_sample_project/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'dbt_sample_project'
version: '1.0.0'

# This setting configures which "profile" dbt uses for this project.
profile: 'dbt_sample_project'

# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"


# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the `{{ config(...) }}` macro.
models:
dbt_sample_project:
# Optional: These settings (e.g., submission_method, notebook_template_id,
# etc.) can also be defined directly in the Python model using dbt.config.
submission_method: bigframes
# Config indicated by + and applies to all files under models/example/
example:
+materialized: view
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# This example demonstrates one of the most general usages of transforming raw
# BigQuery data into a processed table using a dbt Python model with BigFrames.
# See more from: https://cloud.google.com/bigquery/docs/dataframes-dbt.
#
# Key defaults when using BigFrames in a dbt Python model for BigQuery:
# - The default materialization is 'table' unless specified otherwise. This
# means dbt will create a new BigQuery table from the result of this model.
# - The default timeout for the job is 3600 seconds (60 minutes). This can be
# adjusted if your processing requires more time.
# - If no runtime template is provided, dbt will automatically create and reuse
# a default one for executing the Python code in BigQuery.
#
# BigFrames provides a pandas-like API for BigQuery data, enabling familiar
# data manipulation directly within your dbt project. This code sample
# illustrates a basic pattern for:
# 1. Reading data from an existing BigQuery dataset.
# 2. Processing it using pandas-like DataFrame operations powered by BigFrames.
# 3. Outputting a cleaned and transformed table, managed by dbt.


def model(dbt, session):
# Optional: Override settings from your dbt_project.yml file.
# When both are set, dbt.config takes precedence over dbt_project.yml.
#
# Use `dbt.config(submission_method="bigframes")` to tell dbt to execute
# this Python model using BigQuery DataFrames (BigFrames). This allows you
# to write pandas-like code that operates directly on BigQuery data
# without needing to pull all data into memory.
dbt.config(submission_method="bigframes")

# Define the BigQuery table path from which to read data.
table = "bigquery-public-data.epa_historical_air_quality.temperature_hourly_summary"

# Define the specific columns to select from the BigQuery table.
columns = ["state_name", "county_name", "date_local", "time_local", "sample_measurement"]

# Read data from the specified BigQuery table into a BigFrames DataFrame.
df = session.read_gbq(table, columns=columns)

# Sort the DataFrame by the specified columns. This prepares the data for
# `drop_duplicates` to ensure consistent duplicate removal.
df = df.sort_values(columns).drop_duplicates(columns)

# Group the DataFrame by 'state_name', 'county_name', and 'date_local'. For
# each group, calculate the minimum and maximum of the 'sample_measurement'
# column. The result will be a BigFrames DataFrame with a MultiIndex.
result = df.groupby(["state_name", "county_name", "date_local"])["sample_measurement"]\
.agg(["min", "max"])

# Rename some columns and convert the MultiIndex of the 'result' DataFrame
# into regular columns. This flattens the DataFrame so 'state_name',
# 'county_name', and 'date_local' become regular columns again.
result = result.rename(columns={'min': 'min_temperature', 'max': 'max_temperature'})\
.reset_index()

# Return the processed BigFrames DataFrame.
# In a dbt Python model, this DataFrame will be materialized as a table
return result
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# This example demonstrates how to build an **incremental dbt Python model**
# using BigFrames.
#
# Incremental models are essential for efficiently processing large datasets by
# only transforming new or changed data, rather than reprocessing the entire
# dataset every time. If the target table already exists, dbt will perform a
# merge based on the specified unique keys; otherwise, it will create a new
# table automatically.
#
# This model also showcases the definition and application of a **BigFrames
# User-Defined Function (UDF)** to add a descriptive summary column based on
# temperature data. BigFrames UDFs allow you to execute custom Python logic
# directly within BigQuery, leveraging BigQuery's scalability.


import bigframes.pandas as bpd

def model(dbt, session):
# Optional: override settings from dbt_project.yml.
# When both are set, dbt.config takes precedence over dbt_project.yml.
dbt.config(
# Use BigFrames mode to execute this Python model. This enables
# pandas-like operations directly on BigQuery data.
submission_method="bigframes",
# Materialize this model as an 'incremental' table. This tells dbt to
# only process new or updated data on subsequent runs.
materialized='incremental',
# Use MERGE strategy to update rows during incremental runs.
incremental_strategy='merge',
# Define the composite key that uniquely identifies a row in the
# target table. This key is used by the 'merge' strategy to match
# existing rows for updates during incremental runs.
unique_key=["state_name", "county_name", "date_local"],
)

# Reference an upstream dbt model or an existing BigQuery table as a
# BigFrames DataFrame. It allows you to seamlessly use the output of another
# dbt model as input to this one.
df = dbt.ref("dbt_bigframes_code_sample_1")

# Define a BigFrames UDF to generate a temperature description.
# BigFrames UDFs allow you to define custom Python logic that executes
# directly within BigQuery. This is powerful for complex transformations.
@bpd.udf(dataset='dbt_sample_dataset', name='describe_udf')
def describe(
max_temperature: float,
min_temperature: float,
) -> str:
is_hot = max_temperature > 85.0
is_cold = min_temperature < 50.0

if is_hot and is_cold:
return "Expect both hot and cold conditions today."
if is_hot:
return "Overall, it's a hot day."
if is_cold:
return "Overall, it's a cold day."
return "Comfortable throughout the day."

# Apply the UDF using combine and store the result in a column "describe".
df["describe"] = df["max_temperature"].combine(df["min_temperature"], describe)

# Return the transformed BigFrames DataFrame.
# This DataFrame will be the final output of your incremental dbt model.
# On subsequent runs, only new or changed rows will be processed and merged
# into the target BigQuery table based on the `unique_key`.
return df