Skip to content

Commit 3bbcff9

Browse files
committed
fix: SCD Type 2 models should apply grants on insert
1 parent 93aebca commit 3bbcff9

File tree

3 files changed

+225
-28
lines changed

3 files changed

+225
-28
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2446,6 +2446,10 @@ def insert(
24462446
f"Unexpected SCD Type 2 kind: {model.kind}. This is not expected and please report this as a bug."
24472447
)
24482448

2449+
# Apply grants after SCD Type 2 table recreation
2450+
is_snapshot_deployable = kwargs["is_snapshot_deployable"]
2451+
self._apply_grants(model, table_name, GrantsTargetLayer.PHYSICAL, is_snapshot_deployable)
2452+
24492453
def append(
24502454
self,
24512455
table_name: str,

tests/core/engine_adapter/integration/test_integration_postgres.py

Lines changed: 218 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -671,52 +671,53 @@ def test_grants_plan_full_refresh_model_via_replace(
671671
assert dev_grants == {"SELECT": [roles["reader"]["username"]]}
672672

673673

674-
def test_grants_plan_incremental_model_first_insert(
674+
def test_grants_plan_incremental_model(
675675
engine_adapter: PostgresEngineAdapter, ctx: TestContext, tmp_path: Path
676676
):
677-
with create_users(engine_adapter, "reader") as roles:
678-
# Create an incremental model with grants
677+
with create_users(engine_adapter, "reader", "writer") as roles:
679678
(tmp_path / "models").mkdir(exist_ok=True)
680-
(tmp_path / "models" / "incremental_model.sql").write_text(
681-
f"""
682-
MODEL (
683-
name test_schema.incremental_model,
684-
kind INCREMENTAL_BY_TIME_RANGE (
685-
time_column ts
686-
),
687-
grants (
688-
'SELECT' = ['{roles["reader"]["username"]}']
689-
),
690-
grants_target_layer 'all'
691-
);
692679

693-
SELECT 1 as id, @start_ds::timestamp as ts, 'data' as value
694-
"""
695-
)
680+
model_name = "incr_model"
681+
model_definition = f"""
682+
MODEL (
683+
name test_schema.{model_name},
684+
kind INCREMENTAL_BY_TIME_RANGE (
685+
time_column ts
686+
),
687+
grants (
688+
'SELECT' = ['{roles["reader"]["username"]}'],
689+
'INSERT' = ['{roles["writer"]["username"]}']
690+
),
691+
grants_target_layer 'all'
692+
);
693+
SELECT 1 as id, @start_ds::timestamp as ts, 'data' as value
694+
"""
695+
696+
(tmp_path / "models" / f"{model_name}.sql").write_text(model_definition)
696697

697698
context = ctx.create_context(path=tmp_path)
698699

699-
# First run - this will create the table via _replace_query_for_model
700+
# First plan
700701
plan_result = context.plan(
701702
"dev", start="2020-01-01", end="2020-01-01", auto_apply=True, no_prompts=True
702703
)
703-
704704
assert len(plan_result.new_snapshots) == 1
705+
705706
snapshot = plan_result.new_snapshots[0]
706707
table_name = snapshot.table_name()
707708

708-
# Physical table
709709
physical_grants = engine_adapter._get_current_grants_config(
710710
exp.to_table(table_name, dialect=engine_adapter.dialect)
711711
)
712-
assert physical_grants == {"SELECT": [roles["reader"]["username"]]}
712+
assert physical_grants.get("SELECT", []) == [roles["reader"]["username"]]
713+
assert physical_grants.get("INSERT", []) == [roles["writer"]["username"]]
713714

714-
# Virtual view
715-
dev_view_name = "test_schema__dev.incremental_model"
715+
view_name = f"test_schema__dev.{model_name}"
716716
view_grants = engine_adapter._get_current_grants_config(
717-
exp.to_table(dev_view_name, dialect=engine_adapter.dialect)
717+
exp.to_table(view_name, dialect=engine_adapter.dialect)
718718
)
719-
assert view_grants == physical_grants
719+
assert view_grants.get("SELECT", []) == [roles["reader"]["username"]]
720+
assert view_grants.get("INSERT", []) == [roles["writer"]["username"]]
720721

721722

722723
def test_grants_plan_clone_environment(
@@ -1191,3 +1192,194 @@ def test_grants_target_layer_plan_env_with_vde_dev_only(
11911192
exp.to_table(physical_table_name, dialect=engine_adapter.dialect)
11921193
)
11931194
assert roles["grantee"]["username"] in physical_grants.get("SELECT", [])
1195+
1196+
1197+
@pytest.mark.parametrize(
1198+
"model_kind",
1199+
[
1200+
"SCD_TYPE_2",
1201+
"SCD_TYPE_2_BY_TIME",
1202+
],
1203+
)
1204+
def test_grants_plan_scd_type_2_models(
1205+
engine_adapter: PostgresEngineAdapter,
1206+
ctx: TestContext,
1207+
tmp_path: Path,
1208+
model_kind: str,
1209+
):
1210+
with create_users(engine_adapter, "reader", "writer", "analyst") as roles:
1211+
(tmp_path / "models").mkdir(exist_ok=True)
1212+
model_name = "scd_model"
1213+
1214+
kind_config = f"{model_kind} (unique_key [id])"
1215+
model_definition = f"""
1216+
MODEL (
1217+
name test_schema.{model_name},
1218+
kind {kind_config},
1219+
grants (
1220+
'SELECT' = ['{roles["reader"]["username"]}'],
1221+
'INSERT' = ['{roles["writer"]["username"]}']
1222+
),
1223+
grants_target_layer 'all'
1224+
);
1225+
SELECT 1 as id, 'initial_data' as name, CURRENT_TIMESTAMP as updated_at
1226+
"""
1227+
(tmp_path / "models" / f"{model_name}.sql").write_text(model_definition)
1228+
1229+
context = ctx.create_context(path=tmp_path)
1230+
plan_result = context.plan(
1231+
"dev", start="2023-01-01", end="2023-01-01", auto_apply=True, no_prompts=True
1232+
)
1233+
assert len(plan_result.new_snapshots) == 1
1234+
1235+
current_snapshot = plan_result.new_snapshots[0]
1236+
fingerprint_version = current_snapshot.fingerprint.to_version()
1237+
physical_table_name = (
1238+
f"sqlmesh__test_schema.test_schema__{model_name}__{fingerprint_version}__dev"
1239+
)
1240+
physical_grants = engine_adapter._get_current_grants_config(
1241+
exp.to_table(physical_table_name, dialect=engine_adapter.dialect)
1242+
)
1243+
assert physical_grants.get("SELECT", []) == [roles["reader"]["username"]]
1244+
assert physical_grants.get("INSERT", []) == [roles["writer"]["username"]]
1245+
1246+
view_name = f"test_schema__dev.{model_name}"
1247+
view_grants = engine_adapter._get_current_grants_config(
1248+
exp.to_table(view_name, dialect=engine_adapter.dialect)
1249+
)
1250+
assert view_grants.get("SELECT", []) == [roles["reader"]["username"]]
1251+
assert view_grants.get("INSERT", []) == [roles["writer"]["username"]]
1252+
1253+
# Data change
1254+
updated_model_definition = f"""
1255+
MODEL (
1256+
name test_schema.{model_name},
1257+
kind {kind_config},
1258+
grants (
1259+
'SELECT' = ['{roles["reader"]["username"]}'],
1260+
'INSERT' = ['{roles["writer"]["username"]}']
1261+
),
1262+
grants_target_layer 'all'
1263+
);
1264+
SELECT 1 as id, 'updated_data' as name, CURRENT_TIMESTAMP as updated_at
1265+
"""
1266+
(tmp_path / "models" / f"{model_name}.sql").write_text(updated_model_definition)
1267+
1268+
context.load()
1269+
context.plan("dev", start="2023-01-02", end="2023-01-02", auto_apply=True, no_prompts=True)
1270+
1271+
snapshot = context.get_snapshot(f"test_schema.{model_name}")
1272+
assert snapshot
1273+
fingerprint = snapshot.fingerprint.to_version()
1274+
table_name = f"sqlmesh__test_schema.test_schema__{model_name}__{fingerprint}__dev"
1275+
data_change_grants = engine_adapter._get_current_grants_config(
1276+
exp.to_table(table_name, dialect=engine_adapter.dialect)
1277+
)
1278+
assert data_change_grants.get("SELECT", []) == [roles["reader"]["username"]]
1279+
assert data_change_grants.get("INSERT", []) == [roles["writer"]["username"]]
1280+
1281+
# Data + grants changes
1282+
grant_change_model_definition = f"""
1283+
MODEL (
1284+
name test_schema.{model_name},
1285+
kind {kind_config},
1286+
grants (
1287+
'SELECT' = ['{roles["reader"]["username"]}', '{roles["analyst"]["username"]}'],
1288+
'INSERT' = ['{roles["writer"]["username"]}'],
1289+
'UPDATE' = ['{roles["analyst"]["username"]}']
1290+
),
1291+
grants_target_layer 'all'
1292+
);
1293+
SELECT 1 as id, 'grant_changed_data' as name, CURRENT_TIMESTAMP as updated_at
1294+
"""
1295+
(tmp_path / "models" / f"{model_name}.sql").write_text(grant_change_model_definition)
1296+
1297+
context.load()
1298+
context.plan("dev", start="2023-01-03", end="2023-01-03", auto_apply=True, no_prompts=True)
1299+
1300+
snapshot = context.get_snapshot(f"test_schema.{model_name}")
1301+
assert snapshot
1302+
fingerprint = snapshot.fingerprint.to_version()
1303+
table_name = f"sqlmesh__test_schema.test_schema__{model_name}__{fingerprint}__dev"
1304+
final_grants = engine_adapter._get_current_grants_config(
1305+
exp.to_table(table_name, dialect=engine_adapter.dialect)
1306+
)
1307+
expected_select_users = {roles["reader"]["username"], roles["analyst"]["username"]}
1308+
assert set(final_grants.get("SELECT", [])) == expected_select_users
1309+
assert final_grants.get("INSERT", []) == [roles["writer"]["username"]]
1310+
assert final_grants.get("UPDATE", []) == [roles["analyst"]["username"]]
1311+
1312+
final_view_grants = engine_adapter._get_current_grants_config(
1313+
exp.to_table(view_name, dialect=engine_adapter.dialect)
1314+
)
1315+
assert set(final_view_grants.get("SELECT", [])) == expected_select_users
1316+
assert final_view_grants.get("INSERT", []) == [roles["writer"]["username"]]
1317+
assert final_view_grants.get("UPDATE", []) == [roles["analyst"]["username"]]
1318+
1319+
1320+
@pytest.mark.parametrize(
1321+
"model_kind",
1322+
[
1323+
"SCD_TYPE_2",
1324+
"SCD_TYPE_2_BY_TIME",
1325+
],
1326+
)
1327+
def test_grants_plan_scd_type_2_with_vde_dev_only(
1328+
engine_adapter: PostgresEngineAdapter,
1329+
ctx: TestContext,
1330+
tmp_path: Path,
1331+
model_kind: str,
1332+
):
1333+
with create_users(engine_adapter, "etl_user", "analyst") as roles:
1334+
(tmp_path / "models").mkdir(exist_ok=True)
1335+
model_name = "vde_scd_model"
1336+
1337+
model_def = f"""
1338+
MODEL (
1339+
name test_schema.{model_name},
1340+
kind {model_kind} (unique_key [customer_id]),
1341+
grants (
1342+
'SELECT' = ['{roles["analyst"]["username"]}'],
1343+
'INSERT' = ['{roles["etl_user"]["username"]}']
1344+
),
1345+
grants_target_layer 'all'
1346+
);
1347+
SELECT
1348+
1 as customer_id,
1349+
'active' as status,
1350+
CURRENT_TIMESTAMP as updated_at
1351+
"""
1352+
(tmp_path / "models" / f"{model_name}.sql").write_text(model_def)
1353+
1354+
context = ctx.create_context(path=tmp_path, config_mutator=_vde_dev_only_config)
1355+
1356+
# Prod
1357+
context.plan("prod", auto_apply=True, no_prompts=True)
1358+
prod_table = f"test_schema.{model_name}"
1359+
prod_grants = engine_adapter._get_current_grants_config(
1360+
exp.to_table(prod_table, dialect=engine_adapter.dialect)
1361+
)
1362+
assert roles["analyst"]["username"] in prod_grants.get("SELECT", [])
1363+
assert roles["etl_user"]["username"] in prod_grants.get("INSERT", [])
1364+
1365+
# Dev
1366+
context.plan("dev", auto_apply=True, no_prompts=True, include_unmodified=True)
1367+
dev_view = f"test_schema__dev.{model_name}"
1368+
dev_grants = engine_adapter._get_current_grants_config(
1369+
exp.to_table(dev_view, dialect=engine_adapter.dialect)
1370+
)
1371+
assert roles["analyst"]["username"] in dev_grants.get("SELECT", [])
1372+
assert roles["etl_user"]["username"] in dev_grants.get("INSERT", [])
1373+
1374+
snapshot = context.get_snapshot(f"test_schema.{model_name}")
1375+
assert snapshot
1376+
fingerprint_version = snapshot.fingerprint.to_version()
1377+
dev_physical_table_name = (
1378+
f"sqlmesh__test_schema.test_schema__{model_name}__{fingerprint_version}__dev"
1379+
)
1380+
1381+
dev_physical_grants = engine_adapter._get_current_grants_config(
1382+
exp.to_table(dev_physical_table_name, dialect=engine_adapter.dialect)
1383+
)
1384+
assert roles["analyst"]["username"] in dev_physical_grants.get("SELECT", [])
1385+
assert roles["etl_user"]["username"] in dev_physical_grants.get("INSERT", [])

tests/core/test_snapshot_evaluator.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5276,6 +5276,7 @@ def test_grants_evaluator_insert_without_replace_query_for_model(
52765276
"INCREMENTAL_BY_UNIQUE_KEY",
52775277
"INCREMENTAL_UNMANAGED",
52785278
"FULL",
5279+
"SCD_TYPE_2",
52795280
],
52805281
)
52815282
def test_grants_evaluator_insert_with_replace_query_for_model(
@@ -5326,8 +5327,8 @@ def test_grants_evaluator_insert_with_replace_query_for_model(
53265327
snapshots={},
53275328
)
53285329

5329-
if model_kind_name == "FULL":
5330-
# Full refresh always calls _replace_query_for_model()
5330+
if model_kind_name in ("FULL", "SCD_TYPE_2"):
5331+
# Full refresh and SCD_TYPE_2 always recreate the table, so grants are always applied
53315332
sync_grants_mock.assert_called_once()
53325333
assert sync_grants_mock.call_args[0][1] == grants
53335334
else:

0 commit comments

Comments
 (0)