diff --git a/recce/cli.py b/recce/cli.py index 91c0a1967..42615b0c2 100644 --- a/recce/cli.py +++ b/recce/cli.py @@ -287,6 +287,8 @@ def version(): @cli.command(cls=TrackCommand) @add_options(dbt_related_options) @add_options(recce_dbt_artifact_dir_options) +@add_options(recce_cloud_options) +@add_options(recce_cloud_auth_options) @click.option( "--cache-db", help="Path to the column-level lineage cache database.", @@ -294,6 +296,12 @@ def version(): default=None, show_default=False, ) +@click.option( + "--session-id", + help="Recce Cloud session ID (for --cloud mode).", + type=click.STRING, + envvar="RECCE_SESSION_ID", +) def init(cache_db, **kwargs): """ Pre-compute column-level lineage cache from dbt artifacts. @@ -302,12 +310,18 @@ def init(cache_db, **kwargs): (~/.recce/cll_cache.db by default) so that subsequent `recce server` sessions start with a warm cache. + With --cloud, downloads artifacts from Recce Cloud, computes CLL, and + uploads the cache and CLL map back to the session's S3 bucket. + Works with one or both environments (target/ and/or target-base/). """ + import json import logging + import tempfile import time + import requests from rich.console import Console from rich.progress import Progress @@ -319,6 +333,159 @@ def init(cache_db, **kwargs): console = Console() console.rule("Recce Init — Building column-level lineage cache", style="orange3") + # Timeouts for HTTP requests (seconds): short for metadata, long for large files + _METADATA_TIMEOUT = 30 + _DOWNLOAD_TIMEOUT = 300 + _UPLOAD_TIMEOUT = 600 + + is_cloud = kwargs.get("cloud", False) + session_id = kwargs.get("session_id") + cloud_client = None + cloud_org_id = None + cloud_project_id = None + + if is_cloud: + from recce.util.recce_cloud import RecceCloud, RecceCloudException + + cloud_token = kwargs.get("cloud_token") or kwargs.get("api_token") + if not cloud_token: + console.print("[[red]Error[/red]] --cloud requires --cloud-token or --api-token (or GITHUB_TOKEN env var).") + exit(1) + if not session_id: + console.print("[[red]Error[/red]] --cloud requires --session-id (or RECCE_SESSION_ID env var).") + exit(1) + + cloud_client = RecceCloud(token=cloud_token) + if kwargs.get("state_file_host"): + host = kwargs["state_file_host"] + cloud_client.base_url = f"{host}/api/v1" + cloud_client.base_url_v2 = f"{host}/api/v2" + + console.print(f"[bold]Cloud mode[/bold]: session {session_id}") + + # Get session info + try: + session_info = cloud_client.get_session(session_id) + except RecceCloudException as e: + console.print(f"[[red]Error[/red]] Failed to get session: {e}") + exit(1) + if session_info.get("status") == "error": + console.print(f"[[red]Error[/red]] Failed to get session: {session_info.get('message', 'Access denied')}") + exit(1) + cloud_org_id = session_info.get("org_id") + cloud_project_id = session_info.get("project_id") + if not cloud_org_id or not cloud_project_id: + console.print(f"[[red]Error[/red]] Session {session_id} missing org_id or project_id.") + exit(1) + + # Download artifacts to local target directories + console.print("Downloading artifacts from Cloud...") + try: + download_urls = cloud_client.get_download_urls_by_session_id(cloud_org_id, cloud_project_id, session_id) + except RecceCloudException as e: + console.print(f"[[red]Error[/red]] Failed to get download URLs: {e}") + exit(1) + + project_dir_path = Path(kwargs.get("project_dir") or "./") + target_path = project_dir_path / kwargs.get("target_path", "target") + target_base_path = project_dir_path / kwargs.get("target_base_path", "target-base") + target_path.mkdir(parents=True, exist_ok=True) + target_base_path.mkdir(parents=True, exist_ok=True) + + # Download current session artifacts + for artifact_key, filename in [("manifest_url", "manifest.json"), ("catalog_url", "catalog.json")]: + url = download_urls.get(artifact_key) + if url: + try: + resp = requests.get(url, timeout=_METADATA_TIMEOUT) + if resp.status_code == 200: + (target_path / filename).write_bytes(resp.content) + console.print(f" Downloaded {filename} to {target_path}") + else: + console.print( + f" [[yellow]Warning[/yellow]] Failed to download {filename}: HTTP {resp.status_code}" + ) + except requests.RequestException as e: + console.print(f" [[yellow]Warning[/yellow]] Failed to download {filename}: {e}") + + # Download base session artifacts + try: + base_download_urls = cloud_client.get_base_session_download_urls( + cloud_org_id, cloud_project_id, session_id=session_id + ) + except RecceCloudException as e: + console.print(f" [[yellow]Warning[/yellow]] Failed to get base session URLs: {e}") + base_download_urls = {} + for artifact_key, filename in [("manifest_url", "manifest.json"), ("catalog_url", "catalog.json")]: + url = base_download_urls.get(artifact_key) + if url: + try: + resp = requests.get(url, timeout=_METADATA_TIMEOUT) + if resp.status_code == 200: + (target_base_path / filename).write_bytes(resp.content) + console.print(f" Downloaded base {filename} to {target_base_path}") + else: + console.print( + f" [[yellow]Warning[/yellow]] Failed to download base {filename}: HTTP {resp.status_code}" + ) + except requests.RequestException as e: + console.print(f" [[yellow]Warning[/yellow]] Failed to download base {filename}: {e}") + + # Download existing CLL cache for warm start. + # Try current session first, then fall back to production (base) session. + # Use streaming to avoid loading large cache files entirely into memory. + if cache_db is None: + cache_db = _DEFAULT_DB_PATH + Path(cache_db).parent.mkdir(parents=True, exist_ok=True) + + def _stream_download_to_file(url: str, dest: Path) -> int: + """Stream a URL to a file, returning bytes written. Raises on failure.""" + resp = requests.get(url, stream=True, timeout=_DOWNLOAD_TIMEOUT) + if resp.status_code != 200: + return 0 + total = 0 + with tempfile.NamedTemporaryFile(dir=dest.parent, delete=False, suffix=".tmp") as tmp: + tmp_path = Path(tmp.name) + try: + for chunk in resp.iter_content(chunk_size=8192): + tmp.write(chunk) + total += len(chunk) + tmp.flush() + except Exception: + tmp_path.unlink(missing_ok=True) + raise + if total > 0: + tmp_path.rename(dest) + else: + tmp_path.unlink(missing_ok=True) + return total + + cache_downloaded = False + cll_cache_url = download_urls.get("cll_cache_url") + if cll_cache_url: + try: + nbytes = _stream_download_to_file(cll_cache_url, Path(cache_db)) + if nbytes > 0: + console.print(f" Downloaded CLL cache from session ({nbytes / 1024 / 1024:.1f} MB)") + cache_downloaded = True + except requests.RequestException as e: + console.print(f" [[yellow]Warning[/yellow]] Failed to download CLL cache: {e}") + + if not cache_downloaded: + # Fall back to production (base) session cache + base_cache_url = base_download_urls.get("cll_cache_url") + if base_cache_url: + try: + nbytes = _stream_download_to_file(base_cache_url, Path(cache_db)) + if nbytes > 0: + console.print(f" Downloaded CLL cache from base session ({nbytes / 1024 / 1024:.1f} MB)") + cache_downloaded = True + except requests.RequestException as e: + console.print(f" [[yellow]Warning[/yellow]] Failed to download base CLL cache: {e}") + + if not cache_downloaded: + console.print(" [dim]No existing CLL cache found — will compute from scratch[/dim]") + if cache_db is None: cache_db = _DEFAULT_DB_PATH @@ -331,9 +498,10 @@ def init(cache_db, **kwargs): console.print(f"Evicted {evicted} stale cache entries (>7 days unused)") # Check which artifact directories exist - project_dir_path = Path(kwargs.get("project_dir") or "./") - target_path = project_dir_path / kwargs.get("target_path", "target") - target_base_path = project_dir_path / kwargs.get("target_base_path", "target-base") + if not is_cloud: + project_dir_path = Path(kwargs.get("project_dir") or "./") + target_path = project_dir_path / kwargs.get("target_path", "target") + target_base_path = project_dir_path / kwargs.get("target_base_path", "target-base") has_target = (target_path / "manifest.json").is_file() has_base = (target_base_path / "manifest.json").is_file() @@ -471,9 +639,106 @@ def init(cache_db, **kwargs): if fail > 3: console.print(f" [dim]... and {fail - 3} more skipped (see logs for details)[/dim]") + # Build and save the full CLL map as JSON. + # The per-node SQLite cache is warm from the loop above, so this is fast. + console.print("\n[bold]Building full CLL map...[/bold]") + t_map_start = time.perf_counter() + cll_map_path = Path(cache_db).parent / "cll_map.json" + try: + full_cll_map = dbt_adapter.build_full_cll_map() + cll_map_data = full_cll_map.model_dump(mode="json") + # Write to temp file first to avoid corrupted JSON on partial write + tmp_fd, tmp_name = tempfile.mkstemp(dir=cll_map_path.parent, suffix=".tmp") + try: + with os.fdopen(tmp_fd, "w") as f: + json.dump(cll_map_data, f) + Path(tmp_name).rename(cll_map_path) + except Exception: + Path(tmp_name).unlink(missing_ok=True) + raise + map_elapsed = time.perf_counter() - t_map_start + map_size_mb = cll_map_path.stat().st_size / 1024 / 1024 + console.print( + f" CLL map saved to [bold]{cll_map_path}[/bold] " + f"({len(full_cll_map.nodes)} nodes, {len(full_cll_map.columns)} columns, " + f"{map_size_mb:.1f} MB, {map_elapsed:.1f}s)" + ) + except Exception as e: + logger.warning("[recce init] Failed to build CLL map: %s", e) + console.print(f" [[yellow]Warning[/yellow]] Failed to build CLL map: {e}") + stats = cache.stats console.print(f"\nCache saved to [bold]{cache_db}[/bold] ({stats['entries']} entries)") - console.print("Run [bold]recce server --enable-cll-cache[/bold] to use the cached lineage.") + + # Upload results to Cloud if in cloud mode + if is_cloud and cloud_client: + console.print("\n[bold]Uploading results to Cloud...[/bold]") + upload_failures = [] + try: + upload_urls = cloud_client.get_upload_urls_by_session_id(cloud_org_id, cloud_project_id, session_id) + + # Upload CLL map + cll_map_upload_url = upload_urls.get("cll_map_url") + if cll_map_upload_url and cll_map_path.is_file(): + try: + with open(cll_map_path, "rb") as f: + resp = requests.put( + cll_map_upload_url, + data=f, + headers={"Content-Type": "application/json"}, + timeout=_UPLOAD_TIMEOUT, + ) + if resp.status_code in (200, 204): + console.print(f" Uploaded cll_map.json ({cll_map_path.stat().st_size / 1024 / 1024:.1f} MB)") + else: + upload_failures.append("cll_map.json") + console.print( + f" [[yellow]Warning[/yellow]] Failed to upload cll_map.json: HTTP {resp.status_code}" + ) + except requests.RequestException as e: + upload_failures.append("cll_map.json") + console.print(f" [[yellow]Warning[/yellow]] Failed to upload cll_map.json: {e}") + elif not cll_map_upload_url: + console.print( + " [[yellow]Warning[/yellow]] No cll_map_url in upload URLs (Cloud server may need update)" + ) + + # Upload CLL cache + cll_cache_upload_url = upload_urls.get("cll_cache_url") + if cll_cache_upload_url and Path(cache_db).is_file(): + try: + with open(cache_db, "rb") as f: + resp = requests.put( + cll_cache_upload_url, + data=f, + headers={"Content-Type": "application/octet-stream"}, + timeout=_UPLOAD_TIMEOUT, + ) + if resp.status_code in (200, 204): + console.print(f" Uploaded cll_cache.db ({Path(cache_db).stat().st_size / 1024 / 1024:.1f} MB)") + else: + upload_failures.append("cll_cache.db") + console.print( + f" [[yellow]Warning[/yellow]] Failed to upload cll_cache.db: HTTP {resp.status_code}" + ) + except requests.RequestException as e: + upload_failures.append("cll_cache.db") + console.print(f" [[yellow]Warning[/yellow]] Failed to upload cll_cache.db: {e}") + elif not cll_cache_upload_url: + logger.debug("No cll_cache_url in upload URLs — cache upload not supported yet") + + if upload_failures: + console.print( + f"[bold yellow]Cloud upload completed with warnings[/bold yellow] " + f"(failed: {', '.join(upload_failures)})" + ) + else: + console.print("[bold green]Cloud upload complete.[/bold green]") + except Exception as e: + logger.warning("[recce init] Cloud upload failed: %s", e) + console.print(f" [[yellow]Warning[/yellow]] Cloud upload failed: {e}") + else: + console.print("Run [bold]recce server --enable-cll-cache[/bold] to use the cached lineage.") @cli.command(cls=TrackCommand) diff --git a/tests/test_cli_cache.py b/tests/test_cli_cache.py index bb9d75f89..b1f534b34 100644 --- a/tests/test_cli_cache.py +++ b/tests/test_cli_cache.py @@ -1,4 +1,6 @@ import os +import sqlite3 +import tempfile from pathlib import Path from unittest.mock import MagicMock, patch @@ -535,6 +537,796 @@ def test_init_no_warning_when_catalog_exists(self, runner, tmp_path, tmp_db): assert "catalog.json not found" not in result.output +def _make_mock_cloud_client( + session_info: dict | None = None, + download_urls: dict | None = None, + base_download_urls: dict | None = None, + upload_urls: dict | None = None, +): + """Create a mock RecceCloud client with sensible defaults.""" + client = MagicMock() + client.get_session.return_value = session_info or { + "org_id": "org-1", + "project_id": "proj-1", + "status": "active", + } + client.get_download_urls_by_session_id.return_value = download_urls or {} + client.get_base_session_download_urls.return_value = base_download_urls or {} + client.get_upload_urls_by_session_id.return_value = upload_urls or {} + return client + + +def _make_valid_cache_db_bytes() -> bytes: + """Create a valid SQLite CLL cache database as bytes for testing warm-start downloads.""" + tmp_fd, tmp_name = tempfile.mkstemp(suffix=".db") + os.close(tmp_fd) + try: + conn = sqlite3.connect(tmp_name) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute( + "CREATE TABLE IF NOT EXISTS cll_cache " + "(node_id TEXT, content_key TEXT, cll_json TEXT, last_used_at REAL, PRIMARY KEY (node_id))" + ) + conn.commit() + conn.close() + return Path(tmp_name).read_bytes() + finally: + Path(tmp_name).unlink(missing_ok=True) + # Also clean up WAL/SHM files + Path(tmp_name + "-wal").unlink(missing_ok=True) + Path(tmp_name + "-shm").unlink(missing_ok=True) + + +def _make_mock_response(status_code: int = 200, content: bytes = b"{}"): + """Create a mock requests.Response with streaming support.""" + resp = MagicMock() + resp.status_code = status_code + resp.content = content + + def _iter_content(chunk_size=8192): + if content: + for i in range(0, len(content), chunk_size): + yield content[i : i + chunk_size] + + resp.iter_content = _iter_content + return resp + + +class TestInitCloud: + """Tests for `recce init --cloud` mode.""" + + def test_init_cloud_missing_token(self, runner, tmp_path, tmp_db): + """--cloud without --cloud-token or --api-token should error.""" + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + ) + assert result.exit_code == 1 + assert "requires --cloud-token" in result.output + + def test_init_cloud_missing_session_id(self, runner, tmp_path, tmp_db): + """--cloud with token but no --session-id should error.""" + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + ) + assert result.exit_code == 1 + assert "requires --session-id" in result.output + + @patch("recce.cli.RecceCloud", create=True) + def test_init_cloud_session_error(self, mock_cloud_cls, runner, tmp_path, tmp_db): + """Session info returning error status should exit 1.""" + mock_client = _make_mock_cloud_client( + session_info={"status": "error", "message": "Access denied"}, + ) + mock_cloud_cls.return_value = mock_client + + with patch("recce.util.recce_cloud.RecceCloud", mock_cloud_cls): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + ) + assert result.exit_code == 1 + assert "Failed to get session" in result.output + + @patch("recce.cli.RecceCloud", create=True) + def test_init_cloud_missing_org_project(self, mock_cloud_cls, runner, tmp_path, tmp_db): + """Session info without org_id/project_id should exit 1.""" + mock_client = _make_mock_cloud_client( + session_info={"org_id": None, "project_id": None, "status": "active"}, + ) + mock_cloud_cls.return_value = mock_client + + with patch("recce.util.recce_cloud.RecceCloud", mock_cloud_cls): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + ) + assert result.exit_code == 1 + assert "missing org_id or project_id" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_downloads_artifacts(self, mock_load_context, runner, tmp_path, tmp_db): + """Happy path: cloud mode downloads artifacts, computes CLL, uploads results.""" + manifest_bytes = b'{"nodes": {}}' + catalog_bytes = b'{"nodes": {}}' + cache_bytes = b"" # Empty cache = no warm start + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + "catalog_url": "https://s3.example.com/catalog.json", + "cll_cache_url": "https://s3.example.com/cll_cache.db", + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + "catalog_url": "https://s3.example.com/base-catalog.json", + "cll_cache_url": "https://s3.example.com/base-cll_cache.db", + }, + upload_urls={ + "cll_map_url": "https://s3.example.com/upload/cll_map.json", + "cll_cache_url": "https://s3.example.com/upload/cll_cache.db", + }, + ) + + # Mock requests.get for artifact downloads + def mock_get(url, **kwargs): + if "manifest" in url: + return _make_mock_response(200, manifest_bytes) + elif "catalog" in url: + return _make_mock_response(200, catalog_bytes) + elif "cll_cache" in url: + return _make_mock_response(200, cache_bytes) + return _make_mock_response(404) + + # Mock requests.put for uploads + def mock_put(url, **kwargs): + return _make_mock_response(200) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + mock_cll = MagicMock() + adapter.get_cll_cached.return_value = mock_cll + + # build_full_cll_map returns a CllData-like object + mock_cll_map = MagicMock() + mock_cll_map.nodes = {"model.test.a": MagicMock()} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=mock_put), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Cloud upload complete" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_download_failure_warns(self, mock_load_context, runner, tmp_path, tmp_db): + """HTTP 404 on artifact download should warn but continue.""" + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + "catalog_url": "https://s3.example.com/catalog.json", + }, + base_download_urls={}, + ) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + mock_cll = MagicMock() + adapter.get_cll_cached.return_value = mock_cll + + mock_cll_map = MagicMock() + mock_cll_map.nodes = {} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + # All downloads return 404 + def mock_get(url, **kwargs): + return _make_mock_response(404) + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=lambda *a, **kw: _make_mock_response(200)), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Failed to download" in result.output or "No dbt artifacts found" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_upload_failure_warns(self, mock_load_context, runner, tmp_path, tmp_db): + """Upload returning HTTP 500 should warn but still exit 0.""" + manifest_bytes = b'{"nodes": {}}' + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + }, + upload_urls={ + "cll_map_url": "https://s3.example.com/upload/cll_map.json", + "cll_cache_url": "https://s3.example.com/upload/cll_cache.db", + }, + ) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + mock_cll = MagicMock() + adapter.get_cll_cached.return_value = mock_cll + + mock_cll_map = MagicMock() + mock_cll_map.nodes = {"model.test.a": MagicMock()} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + def mock_get(url, **kwargs): + return _make_mock_response(200, manifest_bytes) + + # Uploads return 500 + def mock_put(url, **kwargs): + return _make_mock_response(500) + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=mock_put), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Failed to upload" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_cll_map_build_failure(self, mock_load_context, runner, tmp_path, tmp_db): + """build_full_cll_map raising exception should warn but still complete.""" + manifest_bytes = b'{"nodes": {}}' + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + }, + upload_urls={ + "cll_map_url": "https://s3.example.com/upload/cll_map.json", + "cll_cache_url": "https://s3.example.com/upload/cll_cache.db", + }, + ) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + mock_cll = MagicMock() + adapter.get_cll_cached.return_value = mock_cll + adapter.build_full_cll_map.side_effect = RuntimeError("CLL map computation failed") + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + def mock_get(url, **kwargs): + return _make_mock_response(200, manifest_bytes) + + def mock_put(url, **kwargs): + return _make_mock_response(200) + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=mock_put), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Failed to build CLL map" in result.output + + def test_init_cloud_get_session_raises_recce_cloud_exception(self, runner, tmp_path, tmp_db): + """get_session raising RecceCloudException (non-403 HTTP errors) should exit 1 cleanly.""" + from recce.util.recce_cloud import RecceCloudException + + mock_client = MagicMock() + mock_client.get_session.side_effect = RecceCloudException( + message="Failed to get session from Recce Cloud.", + reason="Internal Server Error", + status_code=500, + ) + + with patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + ) + assert result.exit_code == 1 + assert "Failed to get session" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_warm_cache_from_current_session(self, mock_load_context, runner, tmp_path, tmp_db): + """When current session has a CLL cache, it should be downloaded and used (no base fallback).""" + manifest_bytes = b'{"nodes": {}}' + cache_content = _make_valid_cache_db_bytes() + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + "cll_cache_url": "https://s3.example.com/cll_cache.db", + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + "cll_cache_url": "https://s3.example.com/base-cll_cache.db", + }, + ) + + def mock_get(url, **kwargs): + if "base" not in url and "cll_cache" in url: + return _make_mock_response(200, cache_content) + elif "manifest" in url: + return _make_mock_response(200, manifest_bytes) + return _make_mock_response(404) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + adapter.get_cll_cached.return_value = MagicMock() + mock_cll_map = MagicMock() + mock_cll_map.nodes = {} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=lambda *a, **kw: _make_mock_response(200)), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Downloaded CLL cache from session" in result.output + # Should NOT fall back to base + assert "Downloaded CLL cache from base session" not in result.output + + @patch("recce.core.load_context") + def test_init_cloud_warm_cache_fallback_to_base(self, mock_load_context, runner, tmp_path, tmp_db): + """When current session has no cache, base session cache should be used.""" + manifest_bytes = b'{"nodes": {}}' + base_cache_content = _make_valid_cache_db_bytes() + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + # No cll_cache_url for current session + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + "cll_cache_url": "https://s3.example.com/base-cll_cache.db", + }, + ) + + def mock_get(url, **kwargs): + if "base-cll_cache" in url: + return _make_mock_response(200, base_cache_content) + elif "manifest" in url: + return _make_mock_response(200, manifest_bytes) + return _make_mock_response(404) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + adapter.get_cll_cached.return_value = MagicMock() + mock_cll_map = MagicMock() + mock_cll_map.nodes = {} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=lambda *a, **kw: _make_mock_response(200)), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Downloaded CLL cache from base session" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_no_cache_computes_from_scratch(self, mock_load_context, runner, tmp_path, tmp_db): + """When neither session has a cache, the message should say computing from scratch.""" + manifest_bytes = b'{"nodes": {}}' + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + # No cll_cache_url + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + # No cll_cache_url + }, + ) + + def mock_get(url, **kwargs): + if "manifest" in url: + return _make_mock_response(200, manifest_bytes) + return _make_mock_response(404) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + adapter.get_cll_cached.return_value = MagicMock() + mock_cll_map = MagicMock() + mock_cll_map.nodes = {} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=lambda *a, **kw: _make_mock_response(200)), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "No existing CLL cache found" in result.output + + def test_init_cloud_session_id_from_env_var(self, runner, tmp_path, tmp_db): + """RECCE_SESSION_ID env var should be accepted in place of --session-id.""" + mock_client = _make_mock_cloud_client( + session_info={"org_id": None, "project_id": None, "status": "active"}, + ) + + with patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + # No --session-id flag — use env var instead + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + env={"RECCE_SESSION_ID": "sess-from-env"}, + ) + # Should get past the "requires --session-id" check and reach session validation + assert result.exit_code == 1 + assert "missing org_id or project_id" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_upload_partial_failure_shows_warning(self, mock_load_context, runner, tmp_path, tmp_db): + """If one upload succeeds and one fails, message should indicate partial failure.""" + manifest_bytes = b'{"nodes": {}}' + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + }, + upload_urls={ + "cll_map_url": "https://s3.example.com/upload/cll_map.json", + "cll_cache_url": "https://s3.example.com/upload/cll_cache.db", + }, + ) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + adapter.get_cll_cached.return_value = MagicMock() + mock_cll_map = MagicMock() + mock_cll_map.nodes = {"model.test.a": MagicMock()} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + def mock_get(url, **kwargs): + return _make_mock_response(200, manifest_bytes) + + # CLL map upload succeeds, cache upload fails + def mock_put(url, **kwargs): + if "cll_map" in url: + return _make_mock_response(200) + return _make_mock_response(500) + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=mock_put), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "completed with warnings" in result.output + assert "cll_cache.db" in result.output + + +class TestInitLocalCllMap: + """Tests for cll_map.json generation in non-cloud (local) init.""" + + @patch("recce.core.load_context") + def test_init_local_creates_cll_map_json(self, mock_load_context, runner, tmp_path, tmp_db): + """recce init (local) should create cll_map.json next to the cache db.""" + import json + + _setup_target_dir(tmp_path) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + mock_cll = MagicMock() + adapter.get_cll_cached.return_value = mock_cll + + mock_cll_map = MagicMock() + mock_cll_map.nodes = {"model.test.a": MagicMock()} + mock_cll_map.columns = {"col_a": MagicMock()} + mock_cll_map.model_dump.return_value = {"nodes": {"model.test.a": {}}, "columns": {"col_a": {}}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + with patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ): + result = runner.invoke( + cli, + ["init", "--cache-db", tmp_db, "--project-dir", str(tmp_path)], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + cll_map_path = Path(tmp_db).parent / "cll_map.json" + assert cll_map_path.is_file(), "cll_map.json should be created next to cache db" + data = json.loads(cll_map_path.read_text()) + assert "nodes" in data + assert "columns" in data + assert "CLL map saved to" in result.output + + class TestServerCllCacheFlag: def test_enable_cll_cache_activates_sqlite_cache(self, tmp_db): """--enable-cll-cache should call set_cll_cache with a real db_path."""