diff --git a/recce/adapter/dbt_adapter/__init__.py b/recce/adapter/dbt_adapter/__init__.py index e24ee14ec..28095ae3d 100644 --- a/recce/adapter/dbt_adapter/__init__.py +++ b/recce/adapter/dbt_adapter/__init__.py @@ -992,29 +992,50 @@ def _get_schema(lineage): @staticmethod def _make_node_content_key( - node_id: str, - raw_code: Optional[str], - parent_list: List[str], + checksum: str, + parent_checksums: List[str], column_names: List[str], + adapter_type: str = "", ) -> str: """Content-based cache key for per-node CllData. Uses null byte separators between fields to prevent hash collisions - from concatenation ambiguity (e.g., id="ab"+code="cd" vs id="abc"+code="d"). + from concatenation ambiguity. + + - ``checksum``: dbt-computed sha256 of raw_code (from manifest). + - ``parent_checksums``: checksums of parent nodes (sorted). Cascading + invalidation — if any parent's SQL changes, the child recomputes. + Sources (no SQL) use their node ID as a stable placeholder. + - ``column_names``: output columns from catalog (sorted). + - ``adapter_type``: e.g. "duckdb", "snowflake" — ensures lineage + computed under one dialect is never returned for another. + + ``node_id`` is intentionally omitted here because + ``CllCache.make_node_key`` already includes it in the DB lookup key. """ h = hashlib.sha256() - h.update(node_id.encode("utf-8")) + h.update(adapter_type.encode("utf-8")) h.update(b"\x00") - h.update((raw_code or "").encode("utf-8")) + h.update(checksum.encode("utf-8")) h.update(b"\x00") - for p in sorted(parent_list): + for p in sorted(parent_checksums): h.update(p.encode("utf-8")) h.update(b"\x00") + h.update(b"\x01") # sentinel between parent_checksums and column_names lists for c in sorted(column_names): h.update(c.encode("utf-8")) h.update(b"\x00") return h.hexdigest() + @staticmethod + def _get_node_checksum(manifest, nid: str) -> str: + """Get dbt checksum for a node. Sources/exposures/metrics have no SQL, so use their node ID.""" + if nid in manifest.nodes: + cs = getattr(manifest.nodes[nid], "checksum", None) + if cs and getattr(cs, "checksum", None): + return str(cs.checksum) + return nid + @staticmethod def _serialize_cll_data(cll_data: CllData) -> str: """Serialize a per-node CllData to JSON, excluding change analysis fields (change_status, change_category).""" @@ -1048,6 +1069,9 @@ def build_full_cll_map(self) -> CllData: cache = get_cll_cache() cache.evict_stale() + # Include adapter type in cache keys so different dialects never collide + adapter_type = getattr(manifest.metadata, "adapter_type", None) or self.adapter.type() + # Collect all node IDs from all resource types all_node_ids = set() for key in ["sources", "nodes", "exposures", "metrics"]: @@ -1065,13 +1089,12 @@ def build_full_cll_map(self) -> CllData: batch_to_store = [] for node_id in all_node_ids: - raw_code = None + checksum = self._get_node_checksum(manifest, node_id) p_list: List[str] = [] col_names: List[str] = [] if node_id in manifest.nodes: n = manifest.nodes[node_id] - raw_code = n.raw_code if hasattr(n.depends_on, "nodes"): p_list = n.depends_on.nodes if catalog and node_id in catalog.nodes: @@ -1093,7 +1116,8 @@ def build_full_cll_map(self) -> CllData: if hasattr(n.depends_on, "nodes"): p_list = n.depends_on.nodes - content_key = self._make_node_content_key(node_id, raw_code, p_list, col_names) + parent_checksums = [self._get_node_checksum(manifest, pid) for pid in p_list] + content_key = self._make_node_content_key(checksum, parent_checksums, col_names, adapter_type) cached_json = cache.get_node(node_id, content_key) if cached_json: @@ -1568,7 +1592,14 @@ def _apply_all_columns(node: CllNode, transformation_type): schema = self._build_schema_from_aliases(manifest, catalog, parent_list) if not pre_compiled: - # Fall back to Jinja rendering with custom ref/source functions + # Fall back to Jinja rendering with custom ref/source functions. + # This path is fragile: the Jinja context only provides ref() and source(), + # so models using config(), is_incremental(), etc. will fail silently. + logger.warning( + "[cll] node %s: falling back to Jinja rendering " "(lineage may be incomplete)", + node_id, + ) + def ref_func(*args): node_name: str = None project_or_package: str = None diff --git a/recce/cli.py b/recce/cli.py index 42615b0c2..0379418f6 100644 --- a/recce/cli.py +++ b/recce/cli.py @@ -571,6 +571,7 @@ def _stream_download_to_file(url: str, dest: Path) -> int: manifest = dbt_adapter.base_manifest if is_base else dbt_adapter.curr_manifest catalog = dbt_adapter.base_catalog if is_base else dbt_adapter.curr_catalog + adapter_type = getattr(manifest.metadata, "adapter_type", None) or dbt_adapter.adapter.type() success = 0 fail = 0 @@ -580,18 +581,18 @@ def _stream_download_to_file(url: str, dest: Path) -> int: task = progress.add_task(f" {env_name}", total=len(node_ids)) for nid in node_ids: - raw_code = None p_list: list = [] col_names: list = [] if nid in manifest.nodes: n = manifest.nodes[nid] - raw_code = n.raw_code if hasattr(n.depends_on, "nodes"): p_list = n.depends_on.nodes if catalog and nid in catalog.nodes: col_names = list(catalog.nodes[nid].columns.keys()) - content_key = DbtAdapter._make_node_content_key(nid, raw_code, p_list, col_names) + checksum = DbtAdapter._get_node_checksum(manifest, nid) + parent_checksums = [DbtAdapter._get_node_checksum(manifest, pid) for pid in p_list] + content_key = DbtAdapter._make_node_content_key(checksum, parent_checksums, col_names, adapter_type) cached_json = cache.get_node(nid, content_key) if cached_json: cache_hits += 1 diff --git a/recce/util/cll.py b/recce/util/cll.py index b6a643e03..40bbb260c 100644 --- a/recce/util/cll.py +++ b/recce/util/cll.py @@ -32,9 +32,9 @@ class CllCache: """Per-node CllData cache backed by SQLite. Stores a serialized per-node CllData (excluding change_status/change_category) - keyed by a content hash of the node's inputs (node_id, raw_code, parent_list, - column_names). Because keys are derived from content, identical models in - base and current environments naturally share cache entries. + keyed by a content hash of the node's inputs (adapter_type, checksum, + parent_checksums, column_names). Because keys are derived from content, + identical models in base and current environments naturally share cache entries. - SQLite with WAL mode for concurrent readers. - TTL eviction: entries not accessed within ``ttl_seconds`` are deleted. diff --git a/tests/test_cli_cache.py b/tests/test_cli_cache.py index b1f534b34..50d05decb 100644 --- a/tests/test_cli_cache.py +++ b/tests/test_cli_cache.py @@ -21,12 +21,20 @@ def tmp_db(tmp_path): return str(tmp_path / "test_cll_cache.db") -def _make_mock_node(resource_type: str = "model", raw_code: str = "SELECT 1", dep_nodes: list | None = None): +def _make_mock_node( + resource_type: str = "model", + raw_code: str = "SELECT 1", + dep_nodes: list | None = None, + checksum_value: str | None = None, +): """Create a mock dbt manifest node.""" + import hashlib + node = MagicMock() node.resource_type = resource_type node.raw_code = raw_code node.depends_on.nodes = dep_nodes or [] + node.checksum.checksum = checksum_value or hashlib.sha256(raw_code.encode()).hexdigest() return node @@ -35,10 +43,14 @@ def _make_mock_adapter(nodes: dict, base_manifest=None, curr_catalog=None, base_ adapter = MagicMock() manifest = MagicMock() manifest.nodes = nodes + manifest.metadata.adapter_type = "duckdb" adapter.curr_manifest = manifest adapter.base_manifest = base_manifest + if base_manifest is not None: + base_manifest.metadata.adapter_type = "duckdb" adapter.curr_catalog = curr_catalog adapter.base_catalog = base_catalog + adapter.adapter.type.return_value = "duckdb" return adapter diff --git a/tests/util/test_cll_cache.py b/tests/util/test_cll_cache.py index 894229845..d7f1b76a4 100644 --- a/tests/util/test_cll_cache.py +++ b/tests/util/test_cll_cache.py @@ -201,65 +201,89 @@ class TestContentKeyCorrectness(unittest.TestCase): def _make_key( self, - node_id: str, - raw_code: Optional[str], - parent_list: List[str], - column_names: List[str], + checksum: str = "abc123", + parent_checksums: Optional[List[str]] = None, + column_names: Optional[List[str]] = None, + adapter_type: str = "", ) -> str: """Replicate the content key algorithm from the adapter.""" from recce.adapter.dbt_adapter import DbtAdapter - return DbtAdapter._make_node_content_key(node_id, raw_code, parent_list, column_names) + return DbtAdapter._make_node_content_key(checksum, parent_checksums or [], column_names or [], adapter_type) def test_same_inputs_same_key(self): """Identical inputs must produce the same key every time.""" - k1 = self._make_key("model.a", "SELECT 1", ["model.b"], ["col1"]) - k2 = self._make_key("model.a", "SELECT 1", ["model.b"], ["col1"]) + k1 = self._make_key("cs_a", ["cs_b"], ["col1"]) + k2 = self._make_key("cs_a", ["cs_b"], ["col1"]) assert k1 == k2 - def test_different_sql_different_key(self): - """Changing raw SQL must produce a different key.""" - k1 = self._make_key("model.a", "SELECT 1", ["model.b"], ["col1"]) - k2 = self._make_key("model.a", "SELECT 2", ["model.b"], ["col1"]) + def test_different_checksum_different_key(self): + """Changing the node checksum must produce a different key.""" + k1 = self._make_key("cs_v1", ["cs_b"], ["col1"]) + k2 = self._make_key("cs_v2", ["cs_b"], ["col1"]) assert k1 != k2 - def test_different_parent_list_different_key(self): - """Adding or removing a parent must produce a different key.""" - k1 = self._make_key("model.a", "SELECT 1", ["model.b"], ["col1"]) - k2 = self._make_key("model.a", "SELECT 1", ["model.b", "model.c"], ["col1"]) + def test_different_parent_checksums_different_key(self): + """Changing a parent's checksum must produce a different key.""" + k1 = self._make_key("cs_a", ["parent_v1"], ["col1"]) + k2 = self._make_key("cs_a", ["parent_v2"], ["col1"]) + assert k1 != k2 + + def test_adding_parent_different_key(self): + """Adding a parent must produce a different key.""" + k1 = self._make_key("cs_a", ["cs_b"], ["col1"]) + k2 = self._make_key("cs_a", ["cs_b", "cs_c"], ["col1"]) assert k1 != k2 def test_different_column_names_different_key(self): """Changing column names must produce a different key.""" - k1 = self._make_key("model.a", "SELECT 1", ["model.b"], ["col1"]) - k2 = self._make_key("model.a", "SELECT 1", ["model.b"], ["col1", "col2"]) + k1 = self._make_key("cs_a", ["cs_b"], ["col1"]) + k2 = self._make_key("cs_a", ["cs_b"], ["col1", "col2"]) assert k1 != k2 def test_parent_order_independence(self): - """Parent list ordering must not affect the key (sorted internally).""" - k1 = self._make_key("model.a", "SELECT 1", ["model.c", "model.b"], ["x"]) - k2 = self._make_key("model.a", "SELECT 1", ["model.b", "model.c"], ["x"]) + """Parent checksum ordering must not affect the key (sorted internally).""" + k1 = self._make_key("cs_a", ["cs_c", "cs_b"], ["x"]) + k2 = self._make_key("cs_a", ["cs_b", "cs_c"], ["x"]) assert k1 == k2 def test_column_order_independence(self): """Column name ordering must not affect the key (sorted internally).""" - k1 = self._make_key("model.a", "SELECT 1", ["model.b"], ["z", "a"]) - k2 = self._make_key("model.a", "SELECT 1", ["model.b"], ["a", "z"]) + k1 = self._make_key("cs_a", ["cs_b"], ["z", "a"]) + k2 = self._make_key("cs_a", ["cs_b"], ["a", "z"]) assert k1 == k2 - def test_none_raw_code(self): - """None raw_code (e.g. sources) must produce a valid, deterministic key.""" - k1 = self._make_key("source.s.t", None, [], ["id", "name"]) - k2 = self._make_key("source.s.t", None, [], ["id", "name"]) + def test_source_checksum_is_node_id(self): + """Sources use node ID as checksum placeholder — must be deterministic.""" + k1 = self._make_key("source.s.t", [], ["id", "name"]) + k2 = self._make_key("source.s.t", [], ["id", "name"]) assert k1 == k2 assert len(k1) == 64 # sha256 hex digest def test_empty_inputs(self): """All-empty inputs still produce a valid key.""" - k = self._make_key("model.a", "", [], []) + k = self._make_key("", [], []) assert isinstance(k, str) assert len(k) == 64 + def test_different_adapter_type_different_key(self): + """Switching adapter (e.g. duckdb → snowflake) must produce a different key.""" + k_duck = self._make_key("cs_a", ["cs_b"], ["col1"], adapter_type="duckdb") + k_snow = self._make_key("cs_a", ["cs_b"], ["col1"], adapter_type="snowflake") + assert k_duck != k_snow + + def test_empty_adapter_type_differs_from_named(self): + """An empty adapter_type (legacy) must differ from a named adapter.""" + k_empty = self._make_key("cs_a", [], [], adapter_type="") + k_named = self._make_key("cs_a", [], [], adapter_type="duckdb") + assert k_empty != k_named + + def test_parent_vs_column_list_boundary(self): + """Moving a value from parent_checksums to column_names must produce a different key.""" + k1 = self._make_key("cs_a", ["a", "b"], []) + k2 = self._make_key("cs_a", ["a"], ["b"]) + assert k1 != k2 + # --------------------------------------------------------------------------- # Category 3: Serialization Round-trip @@ -778,7 +802,7 @@ def test_cold_cache_produces_correct_result(self): from recce.adapter.dbt_adapter import DbtAdapter json_str = DbtAdapter._serialize_cll_data(fresh_data) - content_key = DbtAdapter._make_node_content_key("model.a", "SELECT 1 AS c", [], []) + content_key = DbtAdapter._make_node_content_key("cs_a", [], []) cache.put_node("model.a", content_key, json_str) # Warm read @@ -821,7 +845,7 @@ def test_warm_cache_returns_identical_data(self): cache = CllCache(db_path=db_path) json_str = DbtAdapter._serialize_cll_data(fresh) - content_key = DbtAdapter._make_node_content_key("model.b", "SELECT x FROM a", ["model.a"], ["x"]) + content_key = DbtAdapter._make_node_content_key("cs_b", ["cs_a"], ["x"]) cache.put_node("model.b", content_key, json_str) # Read back from cache @@ -856,11 +880,11 @@ def test_partial_cache_mixed_hits_and_misses(self): columns={}, parent_map={}, ) - key_a = DbtAdapter._make_node_content_key("model.a", "SELECT 1", [], []) + key_a = DbtAdapter._make_node_content_key("cs_a", [], []) cache.put_node("model.a", key_a, DbtAdapter._serialize_cll_data(data_a)) # model.b is NOT cached - key_b = DbtAdapter._make_node_content_key("model.b", "SELECT x FROM a", ["model.a"], ["x"]) + key_b = DbtAdapter._make_node_content_key("cs_b", ["cs_a"], ["x"]) # Verify hit and miss assert cache.get_node("model.a", key_a) is not None # hit @@ -875,11 +899,8 @@ def test_model_sql_change_invalidates_cache(self): db_path = os.path.join(tmpdir, "cll_cache.db") cache = CllCache(db_path=db_path) - old_sql = "SELECT id FROM source" - new_sql = "SELECT id, name FROM source" - - old_key = DbtAdapter._make_node_content_key("model.m", old_sql, ["source.s.t"], ["id"]) - new_key = DbtAdapter._make_node_content_key("model.m", new_sql, ["source.s.t"], ["id", "name"]) + old_key = DbtAdapter._make_node_content_key("cs_old", ["source.s.t"], ["id"]) + new_key = DbtAdapter._make_node_content_key("cs_new", ["source.s.t"], ["id", "name"]) # Store with old SQL data = CllData( @@ -889,7 +910,7 @@ def test_model_sql_change_invalidates_cache(self): name="m", package_name="p", resource_type="model", - raw_code=old_sql, + raw_code="SELECT id FROM source", ), }, columns={}, @@ -915,11 +936,11 @@ def test_unchanged_model_shares_cache_across_environments(self): cache = CllCache(db_path=db_path) # Same inputs for base and current (model unchanged) - sql = "SELECT id, name FROM raw_customers" - parents = ["source.raw.customers"] + checksum = "cs_customers" + parent_checksums = ["source.raw.customers"] # source uses ID as checksum columns = ["id", "name"] - content_key = DbtAdapter._make_node_content_key("model.customers", sql, parents, columns) + content_key = DbtAdapter._make_node_content_key(checksum, parent_checksums, columns) data = CllData( nodes={ @@ -928,7 +949,7 @@ def test_unchanged_model_shares_cache_across_environments(self): name="customers", package_name="shop", resource_type="model", - raw_code=sql, + raw_code="SELECT id, name FROM raw_customers", ), }, columns={ @@ -946,7 +967,7 @@ def test_unchanged_model_shares_cache_across_environments(self): cache.put_node("model.customers", content_key, DbtAdapter._serialize_cll_data(data)) # Current env generates the same content_key and gets a hit - current_key = DbtAdapter._make_node_content_key("model.customers", sql, parents, columns) + current_key = DbtAdapter._make_node_content_key(checksum, parent_checksums, columns) assert content_key == current_key assert cache.get_node("model.customers", current_key) is not None assert cache.stats["entries"] == 1 # single entry, shared