Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions recce/adapter/dbt_adapter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -992,29 +992,50 @@ def _get_schema(lineage):

@staticmethod
def _make_node_content_key(
node_id: str,
raw_code: Optional[str],
parent_list: List[str],
checksum: str,
parent_checksums: List[str],
column_names: List[str],
adapter_type: str = "",
) -> str:
"""Content-based cache key for per-node CllData.

Uses null byte separators between fields to prevent hash collisions
from concatenation ambiguity (e.g., id="ab"+code="cd" vs id="abc"+code="d").
from concatenation ambiguity.

- ``checksum``: dbt-computed sha256 of raw_code (from manifest).
- ``parent_checksums``: checksums of parent nodes (sorted). Cascading
invalidation — if any parent's SQL changes, the child recomputes.
Sources (no SQL) use their node ID as a stable placeholder.
- ``column_names``: output columns from catalog (sorted).
- ``adapter_type``: e.g. "duckdb", "snowflake" — ensures lineage
computed under one dialect is never returned for another.

``node_id`` is intentionally omitted here because
``CllCache.make_node_key`` already includes it in the DB lookup key.
"""
h = hashlib.sha256()
h.update(node_id.encode("utf-8"))
h.update(adapter_type.encode("utf-8"))
h.update(b"\x00")
h.update((raw_code or "").encode("utf-8"))
h.update(checksum.encode("utf-8"))
h.update(b"\x00")
for p in sorted(parent_list):
for p in sorted(parent_checksums):
h.update(p.encode("utf-8"))
h.update(b"\x00")
h.update(b"\x01") # sentinel between parent_checksums and column_names lists
for c in sorted(column_names):
h.update(c.encode("utf-8"))
h.update(b"\x00")
return h.hexdigest()

@staticmethod
def _get_node_checksum(manifest, nid: str) -> str:
"""Get dbt checksum for a node. Sources/exposures/metrics have no SQL, so use their node ID."""
if nid in manifest.nodes:
cs = getattr(manifest.nodes[nid], "checksum", None)
if cs and getattr(cs, "checksum", None):
return str(cs.checksum)
return nid

@staticmethod
def _serialize_cll_data(cll_data: CllData) -> str:
"""Serialize a per-node CllData to JSON, excluding change analysis fields (change_status, change_category)."""
Expand Down Expand Up @@ -1048,6 +1069,9 @@ def build_full_cll_map(self) -> CllData:
cache = get_cll_cache()
cache.evict_stale()

# Include adapter type in cache keys so different dialects never collide
adapter_type = getattr(manifest.metadata, "adapter_type", None) or self.adapter.type()

# Collect all node IDs from all resource types
all_node_ids = set()
for key in ["sources", "nodes", "exposures", "metrics"]:
Expand All @@ -1065,13 +1089,12 @@ def build_full_cll_map(self) -> CllData:
batch_to_store = []

for node_id in all_node_ids:
raw_code = None
checksum = self._get_node_checksum(manifest, node_id)
p_list: List[str] = []
col_names: List[str] = []

if node_id in manifest.nodes:
n = manifest.nodes[node_id]
raw_code = n.raw_code
if hasattr(n.depends_on, "nodes"):
p_list = n.depends_on.nodes
if catalog and node_id in catalog.nodes:
Expand All @@ -1093,7 +1116,8 @@ def build_full_cll_map(self) -> CllData:
if hasattr(n.depends_on, "nodes"):
p_list = n.depends_on.nodes

content_key = self._make_node_content_key(node_id, raw_code, p_list, col_names)
parent_checksums = [self._get_node_checksum(manifest, pid) for pid in p_list]
content_key = self._make_node_content_key(checksum, parent_checksums, col_names, adapter_type)

cached_json = cache.get_node(node_id, content_key)
if cached_json:
Expand Down Expand Up @@ -1568,7 +1592,14 @@ def _apply_all_columns(node: CllNode, transformation_type):
schema = self._build_schema_from_aliases(manifest, catalog, parent_list)

if not pre_compiled:
# Fall back to Jinja rendering with custom ref/source functions
# Fall back to Jinja rendering with custom ref/source functions.
# This path is fragile: the Jinja context only provides ref() and source(),
# so models using config(), is_incremental(), etc. will fail silently.
logger.warning(
"[cll] node %s: falling back to Jinja rendering " "(lineage may be incomplete)",
node_id,
)

def ref_func(*args):
node_name: str = None
project_or_package: str = None
Expand Down
7 changes: 4 additions & 3 deletions recce/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ def _stream_download_to_file(url: str, dest: Path) -> int:

manifest = dbt_adapter.base_manifest if is_base else dbt_adapter.curr_manifest
catalog = dbt_adapter.base_catalog if is_base else dbt_adapter.curr_catalog
adapter_type = getattr(manifest.metadata, "adapter_type", None) or dbt_adapter.adapter.type()

success = 0
fail = 0
Expand All @@ -580,18 +581,18 @@ def _stream_download_to_file(url: str, dest: Path) -> int:
task = progress.add_task(f" {env_name}", total=len(node_ids))

for nid in node_ids:
raw_code = None
p_list: list = []
col_names: list = []
if nid in manifest.nodes:
n = manifest.nodes[nid]
raw_code = n.raw_code
if hasattr(n.depends_on, "nodes"):
p_list = n.depends_on.nodes
if catalog and nid in catalog.nodes:
col_names = list(catalog.nodes[nid].columns.keys())

content_key = DbtAdapter._make_node_content_key(nid, raw_code, p_list, col_names)
checksum = DbtAdapter._get_node_checksum(manifest, nid)
parent_checksums = [DbtAdapter._get_node_checksum(manifest, pid) for pid in p_list]
content_key = DbtAdapter._make_node_content_key(checksum, parent_checksums, col_names, adapter_type)
cached_json = cache.get_node(nid, content_key)
if cached_json:
cache_hits += 1
Expand Down
6 changes: 3 additions & 3 deletions recce/util/cll.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class CllCache:
"""Per-node CllData cache backed by SQLite.

Stores a serialized per-node CllData (excluding change_status/change_category)
keyed by a content hash of the node's inputs (node_id, raw_code, parent_list,
column_names). Because keys are derived from content, identical models in
base and current environments naturally share cache entries.
keyed by a content hash of the node's inputs (adapter_type, checksum,
parent_checksums, column_names). Because keys are derived from content,
identical models in base and current environments naturally share cache entries.

- SQLite with WAL mode for concurrent readers.
- TTL eviction: entries not accessed within ``ttl_seconds`` are deleted.
Expand Down
14 changes: 13 additions & 1 deletion tests/test_cli_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@ def tmp_db(tmp_path):
return str(tmp_path / "test_cll_cache.db")


def _make_mock_node(resource_type: str = "model", raw_code: str = "SELECT 1", dep_nodes: list | None = None):
def _make_mock_node(
resource_type: str = "model",
raw_code: str = "SELECT 1",
dep_nodes: list | None = None,
checksum_value: str | None = None,
):
"""Create a mock dbt manifest node."""
import hashlib

node = MagicMock()
node.resource_type = resource_type
node.raw_code = raw_code
node.depends_on.nodes = dep_nodes or []
node.checksum.checksum = checksum_value or hashlib.sha256(raw_code.encode()).hexdigest()
return node


Expand All @@ -35,10 +43,14 @@ def _make_mock_adapter(nodes: dict, base_manifest=None, curr_catalog=None, base_
adapter = MagicMock()
manifest = MagicMock()
manifest.nodes = nodes
manifest.metadata.adapter_type = "duckdb"
adapter.curr_manifest = manifest
adapter.base_manifest = base_manifest
if base_manifest is not None:
base_manifest.metadata.adapter_type = "duckdb"
adapter.curr_catalog = curr_catalog
adapter.base_catalog = base_catalog
adapter.adapter.type.return_value = "duckdb"
return adapter


Expand Down
105 changes: 63 additions & 42 deletions tests/util/test_cll_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,65 +201,89 @@ class TestContentKeyCorrectness(unittest.TestCase):

def _make_key(
self,
node_id: str,
raw_code: Optional[str],
parent_list: List[str],
column_names: List[str],
checksum: str = "abc123",
parent_checksums: Optional[List[str]] = None,
column_names: Optional[List[str]] = None,
adapter_type: str = "",
) -> str:
"""Replicate the content key algorithm from the adapter."""
from recce.adapter.dbt_adapter import DbtAdapter

return DbtAdapter._make_node_content_key(node_id, raw_code, parent_list, column_names)
return DbtAdapter._make_node_content_key(checksum, parent_checksums or [], column_names or [], adapter_type)

def test_same_inputs_same_key(self):
"""Identical inputs must produce the same key every time."""
k1 = self._make_key("model.a", "SELECT 1", ["model.b"], ["col1"])
k2 = self._make_key("model.a", "SELECT 1", ["model.b"], ["col1"])
k1 = self._make_key("cs_a", ["cs_b"], ["col1"])
k2 = self._make_key("cs_a", ["cs_b"], ["col1"])
assert k1 == k2

def test_different_sql_different_key(self):
"""Changing raw SQL must produce a different key."""
k1 = self._make_key("model.a", "SELECT 1", ["model.b"], ["col1"])
k2 = self._make_key("model.a", "SELECT 2", ["model.b"], ["col1"])
def test_different_checksum_different_key(self):
"""Changing the node checksum must produce a different key."""
k1 = self._make_key("cs_v1", ["cs_b"], ["col1"])
k2 = self._make_key("cs_v2", ["cs_b"], ["col1"])
assert k1 != k2

def test_different_parent_list_different_key(self):
"""Adding or removing a parent must produce a different key."""
k1 = self._make_key("model.a", "SELECT 1", ["model.b"], ["col1"])
k2 = self._make_key("model.a", "SELECT 1", ["model.b", "model.c"], ["col1"])
def test_different_parent_checksums_different_key(self):
"""Changing a parent's checksum must produce a different key."""
k1 = self._make_key("cs_a", ["parent_v1"], ["col1"])
k2 = self._make_key("cs_a", ["parent_v2"], ["col1"])
assert k1 != k2

def test_adding_parent_different_key(self):
"""Adding a parent must produce a different key."""
k1 = self._make_key("cs_a", ["cs_b"], ["col1"])
k2 = self._make_key("cs_a", ["cs_b", "cs_c"], ["col1"])
assert k1 != k2

def test_different_column_names_different_key(self):
"""Changing column names must produce a different key."""
k1 = self._make_key("model.a", "SELECT 1", ["model.b"], ["col1"])
k2 = self._make_key("model.a", "SELECT 1", ["model.b"], ["col1", "col2"])
k1 = self._make_key("cs_a", ["cs_b"], ["col1"])
k2 = self._make_key("cs_a", ["cs_b"], ["col1", "col2"])
assert k1 != k2

def test_parent_order_independence(self):
"""Parent list ordering must not affect the key (sorted internally)."""
k1 = self._make_key("model.a", "SELECT 1", ["model.c", "model.b"], ["x"])
k2 = self._make_key("model.a", "SELECT 1", ["model.b", "model.c"], ["x"])
"""Parent checksum ordering must not affect the key (sorted internally)."""
k1 = self._make_key("cs_a", ["cs_c", "cs_b"], ["x"])
k2 = self._make_key("cs_a", ["cs_b", "cs_c"], ["x"])
assert k1 == k2

def test_column_order_independence(self):
"""Column name ordering must not affect the key (sorted internally)."""
k1 = self._make_key("model.a", "SELECT 1", ["model.b"], ["z", "a"])
k2 = self._make_key("model.a", "SELECT 1", ["model.b"], ["a", "z"])
k1 = self._make_key("cs_a", ["cs_b"], ["z", "a"])
k2 = self._make_key("cs_a", ["cs_b"], ["a", "z"])
assert k1 == k2

def test_none_raw_code(self):
"""None raw_code (e.g. sources) must produce a valid, deterministic key."""
k1 = self._make_key("source.s.t", None, [], ["id", "name"])
k2 = self._make_key("source.s.t", None, [], ["id", "name"])
def test_source_checksum_is_node_id(self):
"""Sources use node ID as checksum placeholder — must be deterministic."""
k1 = self._make_key("source.s.t", [], ["id", "name"])
k2 = self._make_key("source.s.t", [], ["id", "name"])
assert k1 == k2
assert len(k1) == 64 # sha256 hex digest

def test_empty_inputs(self):
"""All-empty inputs still produce a valid key."""
k = self._make_key("model.a", "", [], [])
k = self._make_key("", [], [])
assert isinstance(k, str)
assert len(k) == 64

def test_different_adapter_type_different_key(self):
"""Switching adapter (e.g. duckdb → snowflake) must produce a different key."""
k_duck = self._make_key("cs_a", ["cs_b"], ["col1"], adapter_type="duckdb")
k_snow = self._make_key("cs_a", ["cs_b"], ["col1"], adapter_type="snowflake")
assert k_duck != k_snow

def test_empty_adapter_type_differs_from_named(self):
"""An empty adapter_type (legacy) must differ from a named adapter."""
k_empty = self._make_key("cs_a", [], [], adapter_type="")
k_named = self._make_key("cs_a", [], [], adapter_type="duckdb")
assert k_empty != k_named

def test_parent_vs_column_list_boundary(self):
"""Moving a value from parent_checksums to column_names must produce a different key."""
k1 = self._make_key("cs_a", ["a", "b"], [])
k2 = self._make_key("cs_a", ["a"], ["b"])
assert k1 != k2


# ---------------------------------------------------------------------------
# Category 3: Serialization Round-trip
Expand Down Expand Up @@ -778,7 +802,7 @@ def test_cold_cache_produces_correct_result(self):
from recce.adapter.dbt_adapter import DbtAdapter

json_str = DbtAdapter._serialize_cll_data(fresh_data)
content_key = DbtAdapter._make_node_content_key("model.a", "SELECT 1 AS c", [], [])
content_key = DbtAdapter._make_node_content_key("cs_a", [], [])
cache.put_node("model.a", content_key, json_str)

# Warm read
Expand Down Expand Up @@ -821,7 +845,7 @@ def test_warm_cache_returns_identical_data(self):
cache = CllCache(db_path=db_path)

json_str = DbtAdapter._serialize_cll_data(fresh)
content_key = DbtAdapter._make_node_content_key("model.b", "SELECT x FROM a", ["model.a"], ["x"])
content_key = DbtAdapter._make_node_content_key("cs_b", ["cs_a"], ["x"])
cache.put_node("model.b", content_key, json_str)

# Read back from cache
Expand Down Expand Up @@ -856,11 +880,11 @@ def test_partial_cache_mixed_hits_and_misses(self):
columns={},
parent_map={},
)
key_a = DbtAdapter._make_node_content_key("model.a", "SELECT 1", [], [])
key_a = DbtAdapter._make_node_content_key("cs_a", [], [])
cache.put_node("model.a", key_a, DbtAdapter._serialize_cll_data(data_a))

# model.b is NOT cached
key_b = DbtAdapter._make_node_content_key("model.b", "SELECT x FROM a", ["model.a"], ["x"])
key_b = DbtAdapter._make_node_content_key("cs_b", ["cs_a"], ["x"])

# Verify hit and miss
assert cache.get_node("model.a", key_a) is not None # hit
Expand All @@ -875,11 +899,8 @@ def test_model_sql_change_invalidates_cache(self):
db_path = os.path.join(tmpdir, "cll_cache.db")
cache = CllCache(db_path=db_path)

old_sql = "SELECT id FROM source"
new_sql = "SELECT id, name FROM source"

old_key = DbtAdapter._make_node_content_key("model.m", old_sql, ["source.s.t"], ["id"])
new_key = DbtAdapter._make_node_content_key("model.m", new_sql, ["source.s.t"], ["id", "name"])
old_key = DbtAdapter._make_node_content_key("cs_old", ["source.s.t"], ["id"])
new_key = DbtAdapter._make_node_content_key("cs_new", ["source.s.t"], ["id", "name"])

# Store with old SQL
data = CllData(
Expand All @@ -889,7 +910,7 @@ def test_model_sql_change_invalidates_cache(self):
name="m",
package_name="p",
resource_type="model",
raw_code=old_sql,
raw_code="SELECT id FROM source",
),
},
columns={},
Expand All @@ -915,11 +936,11 @@ def test_unchanged_model_shares_cache_across_environments(self):
cache = CllCache(db_path=db_path)

# Same inputs for base and current (model unchanged)
sql = "SELECT id, name FROM raw_customers"
parents = ["source.raw.customers"]
checksum = "cs_customers"
parent_checksums = ["source.raw.customers"] # source uses ID as checksum
columns = ["id", "name"]

content_key = DbtAdapter._make_node_content_key("model.customers", sql, parents, columns)
content_key = DbtAdapter._make_node_content_key(checksum, parent_checksums, columns)

data = CllData(
nodes={
Expand All @@ -928,7 +949,7 @@ def test_unchanged_model_shares_cache_across_environments(self):
name="customers",
package_name="shop",
resource_type="model",
raw_code=sql,
raw_code="SELECT id, name FROM raw_customers",
),
},
columns={
Expand All @@ -946,7 +967,7 @@ def test_unchanged_model_shares_cache_across_environments(self):
cache.put_node("model.customers", content_key, DbtAdapter._serialize_cll_data(data))

# Current env generates the same content_key and gets a hit
current_key = DbtAdapter._make_node_content_key("model.customers", sql, parents, columns)
current_key = DbtAdapter._make_node_content_key(checksum, parent_checksums, columns)
assert content_key == current_key
assert cache.get_node("model.customers", current_key) is not None
assert cache.stats["entries"] == 1 # single entry, shared
Expand Down
Loading