Skip to content

Defining Serverless version for notebook jobs #1277

@ikhudur

Description

@ikhudur

Describe the bug

I cannot define the version of the serverless cluster to use when running a dbt python model. I have tried several different ways, butit always defaults to environment version 1.

The issue seems to be in how the fields are defined, and I have identified possible locations where it should be updated (I am however not sure why the code was written as it is, so I might be missing something).

Issues related to this:
#1009
#1055
#1195

Steps To Reproduce

Create a simple dbt python model (it can return some values). Try to setup the yaml config to define environment version, here is an example:

version: 2
models:
  - name: my_model
    config:
      materialized: table
      submission_method: serverless_cluster
      environments:
        - environment_key: default
          spec:
            environment_version: "3"
      python_job_config:
        name: my_model_single_run
        additional_task_settings: {"environment_key": "default"}

Expected behavior

The single notebook job run that is created and submitted uses serverless verion 3

Screenshots and log output

Image

System information

The output of dbt --version:

dbt Cloud CLI - 0.40.7 (3aa8c1ef8d89fce3bf5750f1a37255a5321f0a5a 2025-10-06T18:56:38Z)

The operating system you're using:
WSL2 Ubuntu-24.04

The output of python --version:

Additional context

I think the issue with the code starts with the class PythonJobConfig here :

import uuid
from typing import Any, Optional
from pydantic import BaseModel, Field, validator
from .util import PYDANTIC_IS_V1
DEFAULT_TIMEOUT = 60 * 60 * 24
JOB_PERMISSIONS = {"CAN_VIEW", "CAN_MANAGE_RUN", "CAN_MANAGE"}
NOTEBOOK_PERMISSIONS = {"CAN_READ", "CAN_RUN", "CAN_EDIT", "CAN_MANAGE"}
class PythonJobConfig(BaseModel):
"""Pydantic model for config found in python_job_config."""
name: Optional[str] = None
grants: dict[str, list[dict[str, str]]] = Field(exclude=True, default_factory=dict)
existing_job_id: str = Field("", exclude=True)
post_hook_tasks: list[dict[str, Any]] = Field(exclude=True, default_factory=list)
additional_task_settings: dict[str, Any] = Field(exclude=True, default_factory=dict)
class Config:
extra = "allow"
class PythonModelConfig(BaseModel):
"""
Pydantic model for a Python model configuration.
Includes some job-specific settings that are not yet part of PythonJobConfig.
"""
user_folder_for_python: bool = False
timeout: int = Field(DEFAULT_TIMEOUT, gt=0)
job_cluster_config: dict[str, Any] = Field(default_factory=dict)
access_control_list: list[dict[str, str]] = Field(default_factory=list)
notebook_access_control_list: list[dict[str, str]] = Field(default_factory=list)
packages: list[str] = Field(default_factory=list)
index_url: Optional[str] = None
additional_libs: list[dict[str, Any]] = Field(default_factory=list)
python_job_config: PythonJobConfig = Field(default_factory=lambda: PythonJobConfig(**{}))
cluster_id: Optional[str] = None
http_path: Optional[str] = None
create_notebook: bool = False
environment_key: Optional[str] = None
environment_dependencies: list[str] = Field(default_factory=list)
@validator("access_control_list")
def validate_job_permissions(cls, v: list[dict[str, str]]) -> list[dict[str, str]]:
for acl in v:
if "permission_level" not in acl:
raise ValueError("permission_level is required in access_control_list")
if acl["permission_level"] not in JOB_PERMISSIONS:
raise ValueError(
f"Invalid permission_level in access_control_list: {acl['permission_level']}. "
f"Must be one of {JOB_PERMISSIONS}"
)
return v
@validator("notebook_access_control_list")
def validate_notebook_permissions(cls, v: list[dict[str, str]]) -> list[dict[str, str]]:
for acl in v:
if "permission_level" not in acl:
raise ValueError("permission_level is required in notebook_access_control_list")
if acl["permission_level"] not in NOTEBOOK_PERMISSIONS:
raise ValueError(
f"Invalid permission_level in notebook_access_control_list: "
f"{acl['permission_level']}. Must be one of {NOTEBOOK_PERMISSIONS}"
)
return v
class ParsedPythonModel(BaseModel):
"""Pydantic model for a Python model parsed from a dbt manifest"""
catalog: str = Field("hive_metastore", alias="database")
# Schema is a reserved name in Pydantic
schema_: str = Field("default", alias="schema")
identifier: str = Field(alias="alias")
config: PythonModelConfig
@property
def run_name(self) -> str:
return f"{self.catalog}-{self.schema_}-{self.identifier}-{uuid.uuid4()}"
class Config:
if PYDANTIC_IS_V1:
allow_population_by_field_name = True
else:
populate_by_name = True

In is missing the definition for the field environments, which appears as in Databricks REST API

Furthermore, in the part of the code that puts the PythonJobDetails together it can be seen that it only sets the environments field IF environment_deps is set and it cannot find the field environments in python_job_config (which it never will, as it is not defined in the class PythonJobConfig):

class PythonJobConfigCompiler:
"""Compiles a Python model into a job configuration for Databricks."""
def __init__(
self,
api_client: DatabricksApiClient,
permission_builder: PythonPermissionBuilder,
parsed_model: ParsedPythonModel,
cluster_spec: dict[str, Any],
) -> None:
self.api_client = api_client
self.permission_builder = permission_builder
self.access_control_list = parsed_model.config.access_control_list
self.run_name = parsed_model.run_name
packages = parsed_model.config.packages
index_url = parsed_model.config.index_url
additional_libraries = parsed_model.config.additional_libs
library_config = get_library_config(packages, index_url, additional_libraries)
self.cluster_spec = {**cluster_spec, **library_config}
self.job_grants = parsed_model.config.python_job_config.grants
self.additional_job_settings = parsed_model.config.python_job_config.dict()
self.environment_key = parsed_model.config.environment_key
self.environment_deps = parsed_model.config.environment_dependencies
def compile(self, path: str) -> PythonJobDetails:
job_spec: dict[str, Any] = {
"task_key": "inner_notebook",
"notebook_task": {
"notebook_path": path,
},
}
additional_job_config = self.additional_job_settings
if self.environment_key:
job_spec["environment_key"] = self.environment_key
if self.environment_deps and not self.additional_job_settings.get("environments"):
additional_job_config["environments"] = [
{
"environment_key": self.environment_key,
"spec": {"client": "2", "dependencies": self.environment_deps},
}
]
job_spec.update(self.cluster_spec)
access_control_list = self.permission_builder.build_job_permissions(
self.job_grants, self.access_control_list
)
if access_control_list:
job_spec["access_control_list"] = access_control_list
job_spec["queue"] = {"enabled": True}
return PythonJobDetails(
run_name=self.run_name, job_spec=job_spec, additional_job_config=additional_job_config
)

And here are 3 issues:
The first, is that it defaults to version 1 for me.

The second and third, are that serverless version 2 is hardcoded and the depracted client field is used (see the Databricks REST API documentation)

"spec": {"client": "2", "dependencies": self.environment_deps},

So, my hunch is that a quick fix is to do something like this:

class PythonJobConfig(BaseModel):
    """Pydantic model for config found in python_job_config."""

    name: Optional[str] = None
    grants: dict[str, list[dict[str, str]]] = Field(exclude=True, default_factory=dict)
    existing_job_id: str = Field("", exclude=True)
    environments: Optional[list[dict[str, Any]]] # Add this field
    post_hook_tasks: list[dict[str, Any]] = Field(exclude=True, default_factory=list)
    additional_task_settings: dict[str, Any] = Field(exclude=True, default_factory=dict)

    class Config:
        extra = "allow"

But I am not entirely sure

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions