Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions sqlmesh/core/engine_adapter/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,26 @@ def _build_column_defs(
return super()._build_column_defs(
target_columns_to_types, column_descriptions, is_view, materialized
)

def columns(
self, table_name: TableName, include_pseudo_columns: bool = False
) -> t.Dict[str, exp.DataType]:
table = exp.to_table(table_name)

column_catalog = table.catalog or self.get_current_catalog()
query = (
exp.select("columns.column_name", "columns.full_data_type")
.from_("system.information_schema.columns")
.where(
exp.and_(
exp.column("table_name").eq(table.name),
exp.column("table_schema").eq(table.db),
exp.column("table_catalog").eq(column_catalog),
)
)
.order_by("ordinal_position ASC")
)

result = self.cursor.fetchall(query)

return {row[0]: exp.DataType.build(row[1], dialect=self.dialect) for row in result}
58 changes: 58 additions & 0 deletions tests/core/engine_adapter/test_databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,3 +526,61 @@ def test_drop_data_object_materialized_view_calls_correct_drop(mocker: MockFixtu
drop_view_mock.assert_called_once_with(
mv_data_object.to_table(), ignore_if_not_exists=True, materialized=True
)


def test_columns(mocker: MockFixture, make_mocked_engine_adapter: t.Callable):
adapter = make_mocked_engine_adapter(DatabricksEngineAdapter, default_catalog="test_catalog")

# Override/mock get_current_catalog to return default
current_catalog_mock = mocker.patch.object(
adapter, "get_current_catalog", return_value="test_catalog"
)
# create long struct columns datatype
long_struct_cols = [f"a_{i}:int" for i in range(50)]
adapter.cursor.fetchall.return_value = [
("bigint_col", "bigint"),
("binary_col", "binary"),
("boolean_col", "boolean"),
("date_col", "date"),
("decimal_col", "decimal(38,4)"),
("double_col", "double"),
("float_col", "float"),
("int_col", "int"),
("small_int", "smallint"),
("string_col", "string"),
("timestamp_col", "timestamp"),
("timestamp_ntz_col", "timestamp_ntz"),
("tinyint_col", "tinyint"),
("array_col", "array<int>"),
("simple_struct_col", "struct<a:int,b:string>"),
("long_struct_col", f"struct<{','.join(long_struct_cols)}>"),
]

resp = adapter.columns("test_db.test_table")
assert resp == {
"bigint_col": exp.DataType.build("bigint", dialect=adapter.dialect),
"binary_col": exp.DataType.build("binary", dialect=adapter.dialect),
"boolean_col": exp.DataType.build("boolean", dialect=adapter.dialect),
"date_col": exp.DataType.build("date", dialect=adapter.dialect),
"decimal_col": exp.DataType.build("decimal(38,4)", dialect=adapter.dialect),
"double_col": exp.DataType.build("double", dialect=adapter.dialect),
"float_col": exp.DataType.build("float", dialect=adapter.dialect),
"int_col": exp.DataType.build("int", dialect=adapter.dialect),
"small_int": exp.DataType.build("smallint", dialect=adapter.dialect),
"string_col": exp.DataType.build("string", dialect=adapter.dialect),
"timestamp_col": exp.DataType.build("timestamp", dialect=adapter.dialect),
"timestamp_ntz_col": exp.DataType.build("timestamp_ntz", dialect=adapter.dialect),
"tinyint_col": exp.DataType.build("tinyint", dialect=adapter.dialect),
"array_col": exp.DataType.build("array<int>", dialect=adapter.dialect),
"simple_struct_col": exp.DataType.build("struct<a:int,b:string>", dialect=adapter.dialect),
"long_struct_col": exp.DataType.build(
f"struct<{','.join(long_struct_cols)}>", dialect=adapter.dialect
),
}

adapter.cursor.fetchall.assert_called_once_with(
parse_one(
"""SELECT columns.column_name, columns.full_data_type FROM system.information_schema.columns WHERE table_name = 'test_table' AND table_schema = 'test_db' AND table_catalog = 'test_catalog' ORDER BY ordinal_position ASC""",
dialect="databricks",
)
)