Skip to content

Commit ca9588e

Browse files
authored
feat: Databricks grants (#5436)
1 parent b8f4a29 commit ca9588e

File tree

4 files changed

+334
-31
lines changed

4 files changed

+334
-31
lines changed

sqlmesh/core/engine_adapter/databricks.py

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from functools import partial
66

77
from sqlglot import exp
8+
89
from sqlmesh.core.dialect import to_schema
910
from sqlmesh.core.engine_adapter.shared import (
1011
CatalogSupport,
@@ -23,7 +24,7 @@
2324
import pandas as pd
2425

2526
from sqlmesh.core._typing import SchemaName, TableName, SessionProperties
26-
from sqlmesh.core.engine_adapter._typing import DF, PySparkSession, Query
27+
from sqlmesh.core.engine_adapter._typing import DF, PySparkSession, Query, GrantsConfig, DCL
2728

2829
logger = logging.getLogger(__name__)
2930

@@ -34,6 +35,7 @@ class DatabricksEngineAdapter(SparkEngineAdapter):
3435
SUPPORTS_CLONING = True
3536
SUPPORTS_MATERIALIZED_VIEWS = True
3637
SUPPORTS_MATERIALIZED_VIEW_SCHEMA = True
38+
SUPPORTS_GRANTS = True
3739
SCHEMA_DIFFER_KWARGS = {
3840
"support_positional_add": True,
3941
"nested_support": NestedSupport.ALL,
@@ -149,6 +151,109 @@ def spark(self) -> PySparkSession:
149151
def catalog_support(self) -> CatalogSupport:
150152
return CatalogSupport.FULL_SUPPORT
151153

154+
@staticmethod
155+
def _grant_object_kind(table_type: DataObjectType) -> str:
156+
if table_type == DataObjectType.VIEW:
157+
return "VIEW"
158+
if table_type == DataObjectType.MATERIALIZED_VIEW:
159+
return "MATERIALIZED VIEW"
160+
return "TABLE"
161+
162+
def _dcl_grants_config_expr(
163+
self,
164+
dcl_cmd: t.Type[DCL],
165+
table: exp.Table,
166+
grant_config: GrantsConfig,
167+
table_type: DataObjectType = DataObjectType.TABLE,
168+
) -> t.List[exp.Expression]:
169+
expressions: t.List[exp.Expression] = []
170+
if not grant_config:
171+
return expressions
172+
173+
object_kind = self._grant_object_kind(table_type)
174+
for privilege, principals in grant_config.items():
175+
for principal in principals:
176+
args: t.Dict[str, t.Any] = {
177+
"privileges": [exp.GrantPrivilege(this=exp.Var(this=privilege))],
178+
"securable": table.copy(),
179+
"principals": [exp.to_identifier(principal.lower())],
180+
}
181+
182+
if object_kind:
183+
args["kind"] = exp.Var(this=object_kind)
184+
185+
expressions.append(dcl_cmd(**args)) # type: ignore[arg-type]
186+
187+
return expressions
188+
189+
def _apply_grants_config_expr(
190+
self,
191+
table: exp.Table,
192+
grant_config: GrantsConfig,
193+
table_type: DataObjectType = DataObjectType.TABLE,
194+
) -> t.List[exp.Expression]:
195+
return self._dcl_grants_config_expr(exp.Grant, table, grant_config, table_type)
196+
197+
def _revoke_grants_config_expr(
198+
self,
199+
table: exp.Table,
200+
grant_config: GrantsConfig,
201+
table_type: DataObjectType = DataObjectType.TABLE,
202+
) -> t.List[exp.Expression]:
203+
return self._dcl_grants_config_expr(exp.Revoke, table, grant_config, table_type)
204+
205+
def _get_current_grants_config(self, table: exp.Table) -> GrantsConfig:
206+
if schema_identifier := table.args.get("db"):
207+
schema_name = schema_identifier.this
208+
else:
209+
schema_name = self.get_current_database()
210+
if catalog_identifier := table.args.get("catalog"):
211+
catalog_name = catalog_identifier.this
212+
else:
213+
catalog_name = self.get_current_catalog()
214+
table_name = table.args.get("this").this # type: ignore
215+
216+
grant_expr = (
217+
exp.select("privilege_type", "grantee")
218+
.from_(
219+
exp.table_(
220+
"table_privileges",
221+
db="information_schema",
222+
catalog=catalog_name,
223+
)
224+
)
225+
.where(
226+
exp.and_(
227+
exp.column("table_catalog").eq(exp.Literal.string(catalog_name.lower())),
228+
exp.column("table_schema").eq(exp.Literal.string(schema_name.lower())),
229+
exp.column("table_name").eq(exp.Literal.string(table_name.lower())),
230+
exp.column("grantor").eq(exp.func("current_user")),
231+
exp.column("grantee").neq(exp.func("current_user")),
232+
# We only care about explicitly granted privileges and not inherited ones
233+
# if this is removed you would see grants inherited from the catalog get returned
234+
exp.column("inherited_from").eq(exp.Literal.string("NONE")),
235+
)
236+
)
237+
)
238+
239+
results = self.fetchall(grant_expr)
240+
241+
grants_dict: GrantsConfig = {}
242+
for privilege_raw, grantee_raw in results:
243+
if privilege_raw is None or grantee_raw is None:
244+
continue
245+
246+
privilege = str(privilege_raw)
247+
grantee = str(grantee_raw)
248+
if not privilege or not grantee:
249+
continue
250+
251+
grantees = grants_dict.setdefault(privilege, [])
252+
if grantee not in grantees:
253+
grantees.append(grantee)
254+
255+
return grants_dict
256+
152257
def _begin_session(self, properties: SessionProperties) -> t.Any:
153258
"""Begin a new session."""
154259
# Align the different possible connectors to a single catalog

tests/core/engine_adapter/integration/__init__.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -746,17 +746,25 @@ def upsert_sql_model(self, model_definition: str) -> t.Tuple[Context, SqlModel]:
746746
self._context.upsert_model(model)
747747
return self._context, model
748748

749-
def _get_create_user_or_role(self, username: str, password: t.Optional[str] = None) -> str:
749+
def _get_create_user_or_role(
750+
self, username: str, password: t.Optional[str] = None
751+
) -> t.Tuple[str, t.Optional[str]]:
750752
password = password or random_id()
751753
if self.dialect == "postgres":
752-
return f"CREATE USER \"{username}\" WITH PASSWORD '{password}'"
754+
return username, f"CREATE USER \"{username}\" WITH PASSWORD '{password}'"
753755
if self.dialect == "snowflake":
754-
return f"CREATE ROLE {username}"
756+
return username, f"CREATE ROLE {username}"
757+
if self.dialect == "databricks":
758+
# Creating an account-level group in Databricks requires making REST API calls so we are going to
759+
# use a pre-created group instead. We assume the suffix on the name is the unique id
760+
return "_".join(username.split("_")[:-1]), None
755761
raise ValueError(f"User creation not supported for dialect: {self.dialect}")
756762

757-
def _create_user_or_role(self, username: str, password: t.Optional[str] = None) -> None:
758-
create_user_sql = self._get_create_user_or_role(username, password)
759-
self.engine_adapter.execute(create_user_sql)
763+
def _create_user_or_role(self, username: str, password: t.Optional[str] = None) -> str:
764+
username, create_user_sql = self._get_create_user_or_role(username, password)
765+
if create_user_sql:
766+
self.engine_adapter.execute(create_user_sql)
767+
return username
760768

761769
@contextmanager
762770
def create_users_or_roles(self, *role_names: str) -> t.Iterator[t.Dict[str, str]]:
@@ -769,7 +777,7 @@ def create_users_or_roles(self, *role_names: str) -> t.Iterator[t.Dict[str, str]
769777
self.add_test_suffix(f"test_{role_name}"), dialect=self.dialect
770778
).sql(dialect=self.dialect)
771779
password = random_id()
772-
self._create_user_or_role(user_name, password)
780+
user_name = self._create_user_or_role(user_name, password)
773781
created_users.append(user_name)
774782
roles[role_name] = user_name
775783

@@ -779,6 +787,18 @@ def create_users_or_roles(self, *role_names: str) -> t.Iterator[t.Dict[str, str]
779787
for user_name in created_users:
780788
self._cleanup_user_or_role(user_name)
781789

790+
def get_insert_privilege(self) -> str:
791+
if self.dialect == "databricks":
792+
# This would really be "MODIFY" but for the purposes of having this be unique from UPDATE
793+
# we return "MANAGE" instead
794+
return "MANAGE"
795+
return "INSERT"
796+
797+
def get_update_privilege(self) -> str:
798+
if self.dialect == "databricks":
799+
return "MODIFY"
800+
return "UPDATE"
801+
782802
def _cleanup_user_or_role(self, user_name: str) -> None:
783803
"""Helper function to clean up a PostgreSQL user and all their dependencies."""
784804
try:
@@ -792,6 +812,8 @@ def _cleanup_user_or_role(self, user_name: str) -> None:
792812
self.engine_adapter.execute(f'DROP USER IF EXISTS "{user_name}"')
793813
elif self.dialect == "snowflake":
794814
self.engine_adapter.execute(f"DROP ROLE IF EXISTS {user_name}")
815+
elif self.dialect == "databricks":
816+
pass
795817
except Exception:
796818
pass
797819

tests/core/engine_adapter/integration/test_integration.py

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3843,23 +3843,24 @@ def test_sync_grants_config(ctx: TestContext) -> None:
38433843
)
38443844

38453845
table = ctx.table("sync_grants_integration")
3846-
3846+
insert_privilege = ctx.get_insert_privilege()
3847+
update_privilege = ctx.get_update_privilege()
38473848
with ctx.create_users_or_roles("reader", "writer", "admin") as roles:
38483849
ctx.engine_adapter.create_table(table, {"id": exp.DataType.build("INT")})
38493850

38503851
initial_grants = {
38513852
"SELECT": [roles["reader"]],
3852-
"INSERT": [roles["writer"]],
3853+
insert_privilege: [roles["writer"]],
38533854
}
38543855
ctx.engine_adapter.sync_grants_config(table, initial_grants)
38553856

38563857
current_grants = ctx.engine_adapter._get_current_grants_config(table)
38573858
assert set(current_grants.get("SELECT", [])) == {roles["reader"]}
3858-
assert set(current_grants.get("INSERT", [])) == {roles["writer"]}
3859+
assert set(current_grants.get(insert_privilege, [])) == {roles["writer"]}
38593860

38603861
target_grants = {
38613862
"SELECT": [roles["writer"], roles["admin"]],
3862-
"UPDATE": [roles["admin"]],
3863+
update_privilege: [roles["admin"]],
38633864
}
38643865
ctx.engine_adapter.sync_grants_config(table, target_grants)
38653866

@@ -3868,8 +3869,8 @@ def test_sync_grants_config(ctx: TestContext) -> None:
38683869
roles["writer"],
38693870
roles["admin"],
38703871
}
3871-
assert set(synced_grants.get("UPDATE", [])) == {roles["admin"]}
3872-
assert synced_grants.get("INSERT", []) == []
3872+
assert set(synced_grants.get(update_privilege, [])) == {roles["admin"]}
3873+
assert synced_grants.get(insert_privilege, []) == []
38733874

38743875

38753876
def test_grants_sync_empty_config(ctx: TestContext):
@@ -3879,19 +3880,19 @@ def test_grants_sync_empty_config(ctx: TestContext):
38793880
)
38803881

38813882
table = ctx.table("grants_empty_test")
3882-
3883+
insert_privilege = ctx.get_insert_privilege()
38833884
with ctx.create_users_or_roles("user") as roles:
38843885
ctx.engine_adapter.create_table(table, {"id": exp.DataType.build("INT")})
38853886

38863887
initial_grants = {
38873888
"SELECT": [roles["user"]],
3888-
"INSERT": [roles["user"]],
3889+
insert_privilege: [roles["user"]],
38893890
}
38903891
ctx.engine_adapter.sync_grants_config(table, initial_grants)
38913892

38923893
initial_current_grants = ctx.engine_adapter._get_current_grants_config(table)
38933894
assert roles["user"] in initial_current_grants.get("SELECT", [])
3894-
assert roles["user"] in initial_current_grants.get("INSERT", [])
3895+
assert roles["user"] in initial_current_grants.get(insert_privilege, [])
38953896

38963897
ctx.engine_adapter.sync_grants_config(table, {})
38973898

@@ -3905,18 +3906,12 @@ def test_grants_case_insensitive_grantees(ctx: TestContext):
39053906
f"Skipping Test since engine adapter {ctx.engine_adapter.dialect} doesn't support grants"
39063907
)
39073908

3908-
with ctx.create_users_or_roles("test_reader", "test_writer") as roles:
3909+
with ctx.create_users_or_roles("reader", "writer") as roles:
39093910
table = ctx.table("grants_quoted_test")
39103911
ctx.engine_adapter.create_table(table, {"id": exp.DataType.build("INT")})
39113912

3912-
test_schema = table.db
3913-
for role_credentials in roles.values():
3914-
ctx.engine_adapter.execute(
3915-
f'GRANT USAGE ON SCHEMA "{test_schema}" TO "{role_credentials}"'
3916-
)
3917-
3918-
reader = roles["test_reader"]
3919-
writer = roles["test_writer"]
3913+
reader = roles["reader"]
3914+
writer = roles["writer"]
39203915

39213916
grants_config = {"SELECT": [reader, writer.upper()]}
39223917
ctx.engine_adapter.sync_grants_config(table, grants_config)
@@ -3941,7 +3936,8 @@ def test_grants_plan(ctx: TestContext, tmp_path: Path):
39413936
f"Skipping Test since engine adapter {ctx.engine_adapter.dialect} doesn't support grants"
39423937
)
39433938

3944-
table = ctx.table("grant_model").sql(dialect=ctx.dialect)
3939+
table = ctx.table("grant_model").sql(dialect="duckdb")
3940+
insert_privilege = ctx.get_insert_privilege()
39453941
with ctx.create_users_or_roles("analyst", "etl_user") as roles:
39463942
(tmp_path / "models").mkdir(exist_ok=True)
39473943

@@ -3990,7 +3986,7 @@ def test_grants_plan(ctx: TestContext, tmp_path: Path):
39903986
kind FULL,
39913987
grants (
39923988
'select' = ['{roles["analyst"]}', '{roles["etl_user"]}'],
3993-
'insert' = ['{roles["etl_user"]}']
3989+
'{insert_privilege}' = ['{roles["etl_user"]}']
39943990
),
39953991
grants_target_layer 'all'
39963992
);
@@ -4015,14 +4011,17 @@ def test_grants_plan(ctx: TestContext, tmp_path: Path):
40154011
)
40164012
expected_final_grants = {
40174013
"SELECT": [roles["analyst"], roles["etl_user"]],
4018-
"INSERT": [roles["etl_user"]],
4014+
insert_privilege: [roles["etl_user"]],
40194015
}
40204016
assert set(final_grants.get("SELECT", [])) == set(expected_final_grants["SELECT"])
4021-
assert final_grants.get("INSERT", []) == expected_final_grants["INSERT"]
4017+
assert final_grants.get(insert_privilege, []) == expected_final_grants[insert_privilege]
40224018

40234019
# Virtual layer should also have the updated grants
40244020
updated_virtual_grants = ctx.engine_adapter._get_current_grants_config(
40254021
exp.to_table(view_name, dialect=ctx.dialect)
40264022
)
40274023
assert set(updated_virtual_grants.get("SELECT", [])) == set(expected_final_grants["SELECT"])
4028-
assert updated_virtual_grants.get("INSERT", []) == expected_final_grants["INSERT"]
4024+
assert (
4025+
updated_virtual_grants.get(insert_privilege, [])
4026+
== expected_final_grants[insert_privilege]
4027+
)

0 commit comments

Comments
 (0)