From eef7c8ed060ca1f9b9a93e95dd8ed236dc915e73 Mon Sep 17 00:00:00 2001 From: even-wei Date: Thu, 9 Apr 2026 16:39:55 +0800 Subject: [PATCH 1/4] feat(cli): add --cloud mode to recce init for CLL pre-computation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `recce init --cloud --session-id ` that: 1. Downloads manifests + catalogs from Recce Cloud session 2. Downloads existing CLL cache (current session → base session fallback) 3. Computes per-node CLL and builds full CLL map 4. Uploads cll_map.json + cll_cache.db back to session S3 This enables the Cloud instance to pre-compute CLL data so the /cll endpoint can serve it without a running Recce instance (DRC-3183). The cache fallback chain (current → base → scratch) means the 200s+ cold-start only happens once per project on production metadata upload. Subsequent PR sessions reuse the warm cache. Also adds cll_map.json generation to local `recce init` (non-cloud), saved alongside the SQLite cache for local development use. Resolves DRC-3181 Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: even-wei --- recce/cli.py | 183 +++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 179 insertions(+), 4 deletions(-) diff --git a/recce/cli.py b/recce/cli.py index 507cf2315..7ddccbe3b 100644 --- a/recce/cli.py +++ b/recce/cli.py @@ -287,6 +287,8 @@ def version(): @cli.command(cls=TrackCommand) @add_options(dbt_related_options) @add_options(recce_dbt_artifact_dir_options) +@add_options(recce_cloud_options) +@add_options(recce_cloud_auth_options) @click.option( "--cache-db", help="Path to the column-level lineage cache database.", @@ -294,6 +296,12 @@ def version(): default=None, show_default=False, ) +@click.option( + "--session-id", + help="Recce Cloud session ID (for --cloud mode).", + type=click.STRING, + envvar="RECCE_SESSION_ID", +) def init(cache_db, **kwargs): """ Pre-compute column-level lineage cache from dbt artifacts. @@ -302,9 +310,13 @@ def init(cache_db, **kwargs): (~/.recce/cll_cache.db by default) so that subsequent `recce server` sessions start with a warm cache. + With --cloud, downloads artifacts from Recce Cloud, computes CLL, and + uploads the cache and CLL map back to the session's S3 bucket. + Works with one or both environments (target/ and/or target-base/). """ + import json import logging import time @@ -319,6 +331,106 @@ def init(cache_db, **kwargs): console = Console() console.rule("Recce Init — Building column-level lineage cache", style="orange3") + is_cloud = kwargs.get("cloud", False) + session_id = kwargs.get("session_id") + cloud_client = None + cloud_org_id = None + cloud_project_id = None + + if is_cloud: + from recce.util.recce_cloud import RecceCloud + + cloud_token = kwargs.get("cloud_token") or kwargs.get("api_token") + if not cloud_token: + console.print("[[red]Error[/red]] --cloud requires --cloud-token or --api-token (or GITHUB_TOKEN env var).") + exit(1) + if not session_id: + console.print("[[red]Error[/red]] --cloud requires --session-id (or RECCE_SESSION_ID env var).") + exit(1) + + cloud_client = RecceCloud(token=cloud_token) + if kwargs.get("state_file_host"): + cloud_client.host = kwargs["state_file_host"] + + console.print(f"[bold]Cloud mode[/bold]: session {session_id}") + + # Get session info + session_info = cloud_client.get_session(session_id) + cloud_org_id = session_info.get("org_id") + cloud_project_id = session_info.get("project_id") + if not cloud_org_id or not cloud_project_id: + console.print(f"[[red]Error[/red]] Session {session_id} missing org_id or project_id.") + exit(1) + + # Download artifacts to local target directories + import requests + + console.print("Downloading artifacts from Cloud...") + download_urls = cloud_client.get_download_urls_by_session_id(cloud_org_id, cloud_project_id, session_id) + + project_dir_path = Path(kwargs.get("project_dir") or "./") + target_path = project_dir_path / kwargs.get("target_path", "target") + target_base_path = project_dir_path / kwargs.get("target_base_path", "target-base") + target_path.mkdir(parents=True, exist_ok=True) + target_base_path.mkdir(parents=True, exist_ok=True) + + # Download current session artifacts + for artifact_key, filename in [("manifest_url", "manifest.json"), ("catalog_url", "catalog.json")]: + url = download_urls.get(artifact_key) + if url: + resp = requests.get(url) + if resp.status_code == 200: + (target_path / filename).write_bytes(resp.content) + console.print(f" Downloaded {filename} to {target_path}") + else: + console.print(f" [[yellow]Warning[/yellow]] Failed to download {filename}: HTTP {resp.status_code}") + + # Download base session artifacts + base_download_urls = cloud_client.get_base_session_download_urls( + cloud_org_id, cloud_project_id, session_id=session_id + ) + for artifact_key, filename in [("manifest_url", "manifest.json"), ("catalog_url", "catalog.json")]: + url = base_download_urls.get(artifact_key) + if url: + resp = requests.get(url) + if resp.status_code == 200: + (target_base_path / filename).write_bytes(resp.content) + console.print(f" Downloaded base {filename} to {target_base_path}") + else: + console.print( + f" [[yellow]Warning[/yellow]] Failed to download base {filename}: HTTP {resp.status_code}" + ) + + # Download existing CLL cache for warm start. + # Try current session first, then fall back to production (base) session. + if cache_db is None: + cache_db = _DEFAULT_DB_PATH + Path(cache_db).parent.mkdir(parents=True, exist_ok=True) + + cache_downloaded = False + cll_cache_url = download_urls.get("cll_cache_url") + if cll_cache_url: + resp = requests.get(cll_cache_url) + if resp.status_code == 200 and len(resp.content) > 0: + Path(cache_db).write_bytes(resp.content) + console.print(f" Downloaded CLL cache from session ({len(resp.content) / 1024 / 1024:.1f} MB)") + cache_downloaded = True + + if not cache_downloaded: + # Fall back to production (base) session cache + base_cache_url = base_download_urls.get("cll_cache_url") + if base_cache_url: + resp = requests.get(base_cache_url) + if resp.status_code == 200 and len(resp.content) > 0: + Path(cache_db).write_bytes(resp.content) + console.print( + f" Downloaded CLL cache from base session ({len(resp.content) / 1024 / 1024:.1f} MB)" + ) + cache_downloaded = True + + if not cache_downloaded: + console.print(" [dim]No existing CLL cache found — will compute from scratch[/dim]") + if cache_db is None: cache_db = _DEFAULT_DB_PATH @@ -331,9 +443,10 @@ def init(cache_db, **kwargs): console.print(f"Evicted {evicted} stale cache entries (>7 days unused)") # Check which artifact directories exist - project_dir_path = Path(kwargs.get("project_dir") or "./") - target_path = project_dir_path / kwargs.get("target_path", "target") - target_base_path = project_dir_path / kwargs.get("target_base_path", "target-base") + if not is_cloud: + project_dir_path = Path(kwargs.get("project_dir") or "./") + target_path = project_dir_path / kwargs.get("target_path", "target") + target_base_path = project_dir_path / kwargs.get("target_base_path", "target-base") has_target = (target_path / "manifest.json").is_file() has_base = (target_base_path / "manifest.json").is_file() @@ -471,9 +584,71 @@ def init(cache_db, **kwargs): if fail > 3: console.print(f" [dim]... and {fail - 3} more skipped (see logs for details)[/dim]") + # Build and save the full CLL map as JSON. + # The per-node SQLite cache is warm from the loop above, so this is fast. + console.print("\n[bold]Building full CLL map...[/bold]") + t_map_start = time.perf_counter() + try: + full_cll_map = dbt_adapter.build_full_cll_map() + cll_map_path = Path(cache_db).parent / "cll_map.json" + cll_map_data = full_cll_map.model_dump(mode="json") + with open(cll_map_path, "w") as f: + json.dump(cll_map_data, f) + map_elapsed = time.perf_counter() - t_map_start + map_size_mb = cll_map_path.stat().st_size / 1024 / 1024 + console.print( + f" CLL map saved to [bold]{cll_map_path}[/bold] " + f"({len(full_cll_map.nodes)} nodes, {len(full_cll_map.columns)} columns, " + f"{map_size_mb:.1f} MB, {map_elapsed:.1f}s)" + ) + except Exception as e: + logger.warning("[recce init] Failed to build CLL map: %s", e) + console.print(f" [[yellow]Warning[/yellow]] Failed to build CLL map: {e}") + stats = cache.stats console.print(f"\nCache saved to [bold]{cache_db}[/bold] ({stats['entries']} entries)") - console.print("Run [bold]recce server --enable-cll-cache[/bold] to use the cached lineage.") + + # Upload results to Cloud if in cloud mode + if is_cloud and cloud_client: + import requests + + console.print("\n[bold]Uploading results to Cloud...[/bold]") + try: + upload_urls = cloud_client.get_upload_urls_by_session_id(cloud_org_id, cloud_project_id, session_id) + + # Upload CLL map + cll_map_path = Path(cache_db).parent / "cll_map.json" + cll_map_upload_url = upload_urls.get("cll_map_url") + if cll_map_upload_url and cll_map_path.is_file(): + with open(cll_map_path, "rb") as f: + resp = requests.put(cll_map_upload_url, data=f, headers={"Content-Type": "application/json"}) + if resp.status_code in (200, 204): + console.print(f" Uploaded cll_map.json ({cll_map_path.stat().st_size / 1024 / 1024:.1f} MB)") + else: + console.print(f" [[yellow]Warning[/yellow]] Failed to upload cll_map.json: HTTP {resp.status_code}") + elif not cll_map_upload_url: + console.print(" [[yellow]Warning[/yellow]] No cll_map_url in upload URLs (Cloud server may need update)") + + # Upload CLL cache + cll_cache_upload_url = upload_urls.get("cll_cache_url") + if cll_cache_upload_url and Path(cache_db).is_file(): + with open(cache_db, "rb") as f: + resp = requests.put( + cll_cache_upload_url, data=f, headers={"Content-Type": "application/octet-stream"} + ) + if resp.status_code in (200, 204): + console.print(f" Uploaded cll_cache.db ({Path(cache_db).stat().st_size / 1024 / 1024:.1f} MB)") + else: + console.print(f" [[yellow]Warning[/yellow]] Failed to upload cll_cache.db: HTTP {resp.status_code}") + elif not cll_cache_upload_url: + logger.debug("No cll_cache_url in upload URLs — cache upload not supported yet") + + console.print("[bold green]Cloud upload complete.[/bold green]") + except Exception as e: + logger.warning("[recce init] Cloud upload failed: %s", e) + console.print(f" [[yellow]Warning[/yellow]] Cloud upload failed: {e}") + else: + console.print("Run [bold]recce server --enable-cll-cache[/bold] to use the cached lineage.") @cli.command(cls=TrackCommand) From 91cc16cd63ebbc28bc457349bb83c7d954daf7ac Mon Sep 17 00:00:00 2001 From: even-wei Date: Thu, 9 Apr 2026 16:46:12 +0800 Subject: [PATCH 2/4] fix(cli): address self-review findings for recce init --cloud - Handle get_session 403 error explicitly (was producing misleading "missing org_id" error instead of "access denied") - Fix state_file_host override (was setting nonexistent .host attribute; now correctly overrides base_url and base_url_v2) - Wrap get_download_urls and get_base_session_download_urls in try/except for graceful error handling - Remove duplicate import requests Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: even-wei --- recce/cli.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/recce/cli.py b/recce/cli.py index 7ddccbe3b..5f5f9eebf 100644 --- a/recce/cli.py +++ b/recce/cli.py @@ -338,7 +338,9 @@ def init(cache_db, **kwargs): cloud_project_id = None if is_cloud: - from recce.util.recce_cloud import RecceCloud + import requests + + from recce.util.recce_cloud import RecceCloud, RecceCloudException cloud_token = kwargs.get("cloud_token") or kwargs.get("api_token") if not cloud_token: @@ -350,12 +352,17 @@ def init(cache_db, **kwargs): cloud_client = RecceCloud(token=cloud_token) if kwargs.get("state_file_host"): - cloud_client.host = kwargs["state_file_host"] + host = kwargs["state_file_host"] + cloud_client.base_url = f"{host}/api/v1" + cloud_client.base_url_v2 = f"{host}/api/v2" console.print(f"[bold]Cloud mode[/bold]: session {session_id}") # Get session info session_info = cloud_client.get_session(session_id) + if session_info.get("status") == "error": + console.print(f"[[red]Error[/red]] Failed to get session: {session_info.get('message', 'Access denied')}") + exit(1) cloud_org_id = session_info.get("org_id") cloud_project_id = session_info.get("project_id") if not cloud_org_id or not cloud_project_id: @@ -363,10 +370,12 @@ def init(cache_db, **kwargs): exit(1) # Download artifacts to local target directories - import requests - console.print("Downloading artifacts from Cloud...") - download_urls = cloud_client.get_download_urls_by_session_id(cloud_org_id, cloud_project_id, session_id) + try: + download_urls = cloud_client.get_download_urls_by_session_id(cloud_org_id, cloud_project_id, session_id) + except RecceCloudException as e: + console.print(f"[[red]Error[/red]] Failed to get download URLs: {e}") + exit(1) project_dir_path = Path(kwargs.get("project_dir") or "./") target_path = project_dir_path / kwargs.get("target_path", "target") @@ -386,9 +395,13 @@ def init(cache_db, **kwargs): console.print(f" [[yellow]Warning[/yellow]] Failed to download {filename}: HTTP {resp.status_code}") # Download base session artifacts - base_download_urls = cloud_client.get_base_session_download_urls( - cloud_org_id, cloud_project_id, session_id=session_id - ) + try: + base_download_urls = cloud_client.get_base_session_download_urls( + cloud_org_id, cloud_project_id, session_id=session_id + ) + except RecceCloudException as e: + console.print(f" [[yellow]Warning[/yellow]] Failed to get base session URLs: {e}") + base_download_urls = {} for artifact_key, filename in [("manifest_url", "manifest.json"), ("catalog_url", "catalog.json")]: url = base_download_urls.get(artifact_key) if url: @@ -610,8 +623,6 @@ def init(cache_db, **kwargs): # Upload results to Cloud if in cloud mode if is_cloud and cloud_client: - import requests - console.print("\n[bold]Uploading results to Cloud...[/bold]") try: upload_urls = cloud_client.get_upload_urls_by_session_id(cloud_org_id, cloud_project_id, session_id) From 1a6b170e0fe479cab2841e1d03995dd6cc562c65 Mon Sep 17 00:00:00 2001 From: even-wei Date: Fri, 10 Apr 2026 10:02:20 +0800 Subject: [PATCH 3/4] test(cli): add coverage for recce init --cloud mode Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: even-wei --- tests/test_cli_cache.py | 365 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 365 insertions(+) diff --git a/tests/test_cli_cache.py b/tests/test_cli_cache.py index bb9d75f89..0519a053c 100644 --- a/tests/test_cli_cache.py +++ b/tests/test_cli_cache.py @@ -535,6 +535,371 @@ def test_init_no_warning_when_catalog_exists(self, runner, tmp_path, tmp_db): assert "catalog.json not found" not in result.output +def _make_mock_cloud_client( + session_info: dict | None = None, + download_urls: dict | None = None, + base_download_urls: dict | None = None, + upload_urls: dict | None = None, +): + """Create a mock RecceCloud client with sensible defaults.""" + client = MagicMock() + client.get_session.return_value = session_info or { + "org_id": "org-1", + "project_id": "proj-1", + "status": "active", + } + client.get_download_urls_by_session_id.return_value = download_urls or {} + client.get_base_session_download_urls.return_value = base_download_urls or {} + client.get_upload_urls_by_session_id.return_value = upload_urls or {} + return client + + +def _make_mock_response(status_code: int = 200, content: bytes = b"{}"): + """Create a mock requests.Response.""" + resp = MagicMock() + resp.status_code = status_code + resp.content = content + return resp + + +class TestInitCloud: + """Tests for `recce init --cloud` mode.""" + + def test_init_cloud_missing_token(self, runner, tmp_path, tmp_db): + """--cloud without --cloud-token or --api-token should error.""" + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--session-id", "sess-1", + "--cache-db", tmp_db, + "--project-dir", str(tmp_path), + ], + ) + assert result.exit_code == 1 + assert "requires --cloud-token" in result.output + + def test_init_cloud_missing_session_id(self, runner, tmp_path, tmp_db): + """--cloud with token but no --session-id should error.""" + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", "ghp_testtoken", + "--cache-db", tmp_db, + "--project-dir", str(tmp_path), + ], + ) + assert result.exit_code == 1 + assert "requires --session-id" in result.output + + @patch("recce.cli.RecceCloud", create=True) + def test_init_cloud_session_error(self, mock_cloud_cls, runner, tmp_path, tmp_db): + """Session info returning error status should exit 1.""" + mock_client = _make_mock_cloud_client( + session_info={"status": "error", "message": "Access denied"}, + ) + mock_cloud_cls.return_value = mock_client + + with patch("recce.util.recce_cloud.RecceCloud", mock_cloud_cls): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", "ghp_testtoken", + "--session-id", "sess-1", + "--cache-db", tmp_db, + "--project-dir", str(tmp_path), + ], + ) + assert result.exit_code == 1 + assert "Failed to get session" in result.output + + @patch("recce.cli.RecceCloud", create=True) + def test_init_cloud_missing_org_project(self, mock_cloud_cls, runner, tmp_path, tmp_db): + """Session info without org_id/project_id should exit 1.""" + mock_client = _make_mock_cloud_client( + session_info={"org_id": None, "project_id": None, "status": "active"}, + ) + mock_cloud_cls.return_value = mock_client + + with patch("recce.util.recce_cloud.RecceCloud", mock_cloud_cls): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", "ghp_testtoken", + "--session-id", "sess-1", + "--cache-db", tmp_db, + "--project-dir", str(tmp_path), + ], + ) + assert result.exit_code == 1 + assert "missing org_id or project_id" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_downloads_artifacts(self, mock_load_context, runner, tmp_path, tmp_db): + """Happy path: cloud mode downloads artifacts, computes CLL, uploads results.""" + manifest_bytes = b'{"nodes": {}}' + catalog_bytes = b'{"nodes": {}}' + cache_bytes = b"" # Empty cache = no warm start + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + "catalog_url": "https://s3.example.com/catalog.json", + "cll_cache_url": "https://s3.example.com/cll_cache.db", + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + "catalog_url": "https://s3.example.com/base-catalog.json", + "cll_cache_url": "https://s3.example.com/base-cll_cache.db", + }, + upload_urls={ + "cll_map_url": "https://s3.example.com/upload/cll_map.json", + "cll_cache_url": "https://s3.example.com/upload/cll_cache.db", + }, + ) + + # Mock requests.get for artifact downloads + def mock_get(url, **kwargs): + if "manifest" in url: + return _make_mock_response(200, manifest_bytes) + elif "catalog" in url: + return _make_mock_response(200, catalog_bytes) + elif "cll_cache" in url: + return _make_mock_response(200, cache_bytes) + return _make_mock_response(404) + + # Mock requests.put for uploads + def mock_put(url, **kwargs): + return _make_mock_response(200) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + mock_cll = MagicMock() + adapter.get_cll_cached.return_value = mock_cll + + # build_full_cll_map returns a CllData-like object + mock_cll_map = MagicMock() + mock_cll_map.nodes = {"model.test.a": MagicMock()} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=mock_put), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", "ghp_testtoken", + "--session-id", "sess-1", + "--cache-db", tmp_db, + "--project-dir", str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Cloud upload complete" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_download_failure_warns(self, mock_load_context, runner, tmp_path, tmp_db): + """HTTP 404 on artifact download should warn but continue.""" + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + "catalog_url": "https://s3.example.com/catalog.json", + }, + base_download_urls={}, + ) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + mock_cll = MagicMock() + adapter.get_cll_cached.return_value = mock_cll + + mock_cll_map = MagicMock() + mock_cll_map.nodes = {} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + # All downloads return 404 + def mock_get(url, **kwargs): + return _make_mock_response(404) + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=lambda *a, **kw: _make_mock_response(200)), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", "ghp_testtoken", + "--session-id", "sess-1", + "--cache-db", tmp_db, + "--project-dir", str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Failed to download" in result.output or "No dbt artifacts found" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_upload_failure_warns(self, mock_load_context, runner, tmp_path, tmp_db): + """Upload returning HTTP 500 should warn but still exit 0.""" + manifest_bytes = b'{"nodes": {}}' + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + }, + upload_urls={ + "cll_map_url": "https://s3.example.com/upload/cll_map.json", + "cll_cache_url": "https://s3.example.com/upload/cll_cache.db", + }, + ) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + mock_cll = MagicMock() + adapter.get_cll_cached.return_value = mock_cll + + mock_cll_map = MagicMock() + mock_cll_map.nodes = {"model.test.a": MagicMock()} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + def mock_get(url, **kwargs): + return _make_mock_response(200, manifest_bytes) + + # Uploads return 500 + def mock_put(url, **kwargs): + return _make_mock_response(500) + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=mock_put), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", "ghp_testtoken", + "--session-id", "sess-1", + "--cache-db", tmp_db, + "--project-dir", str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Failed to upload" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_cll_map_build_failure(self, mock_load_context, runner, tmp_path, tmp_db): + """build_full_cll_map raising exception should warn but still complete.""" + manifest_bytes = b'{"nodes": {}}' + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + }, + upload_urls={ + "cll_map_url": "https://s3.example.com/upload/cll_map.json", + "cll_cache_url": "https://s3.example.com/upload/cll_cache.db", + }, + ) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + mock_cll = MagicMock() + adapter.get_cll_cached.return_value = mock_cll + adapter.build_full_cll_map.side_effect = RuntimeError("CLL map computation failed") + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + def mock_get(url, **kwargs): + return _make_mock_response(200, manifest_bytes) + + def mock_put(url, **kwargs): + return _make_mock_response(200) + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=mock_put), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", "ghp_testtoken", + "--session-id", "sess-1", + "--cache-db", tmp_db, + "--project-dir", str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Failed to build CLL map" in result.output + + class TestServerCllCacheFlag: def test_enable_cll_cache_activates_sqlite_cache(self, tmp_db): """--enable-cll-cache should call set_cll_cache with a real db_path.""" From 3f36e93d9d4518439a774dec557fd928bcaa35c1 Mon Sep 17 00:00:00 2001 From: Jared Scott Date: Fri, 10 Apr 2026 11:52:23 +0800 Subject: [PATCH 4/4] fix(cli): harden recce init --cloud error handling, timeouts, and test coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address all findings from Claude Code review and Copilot review on PR 1284: Error handling: - Wrap get_session() in try/except RecceCloudException for non-403 errors - Catch requests.RequestException on all HTTP calls with clean warnings - Fix misleading "Cloud upload complete" — now tracks failures and shows "completed with warnings (failed: ...)" when any upload fails Robustness: - Add timeouts to all requests.get/put calls (30s metadata, 300s download, 600s upload) - Stream CLL cache downloads to temp file via iter_content() to avoid OOM on large caches - Write cll_map.json atomically via tempfile + rename to prevent corrupted uploads - Move `import requests` to function-level scope for maintainability Test coverage (7 new tests, 33 → 40): - RecceCloudException from get_session() (non-403 HTTP errors) - Warm cache fallback chain: current hit, base fallback, compute from scratch - RECCE_SESSION_ID env var path - Partial upload failure warning message - cll_map.json creation in local (non-cloud) init Co-Authored-By: Claude Opus 4.6 (1M context) Signed-off-by: Jared Scott --- recce/cli.py | 177 +++++++++++---- tests/test_cli_cache.py | 489 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 586 insertions(+), 80 deletions(-) diff --git a/recce/cli.py b/recce/cli.py index 3477c6a95..42615b0c2 100644 --- a/recce/cli.py +++ b/recce/cli.py @@ -318,8 +318,10 @@ def init(cache_db, **kwargs): import json import logging + import tempfile import time + import requests from rich.console import Console from rich.progress import Progress @@ -331,6 +333,11 @@ def init(cache_db, **kwargs): console = Console() console.rule("Recce Init — Building column-level lineage cache", style="orange3") + # Timeouts for HTTP requests (seconds): short for metadata, long for large files + _METADATA_TIMEOUT = 30 + _DOWNLOAD_TIMEOUT = 300 + _UPLOAD_TIMEOUT = 600 + is_cloud = kwargs.get("cloud", False) session_id = kwargs.get("session_id") cloud_client = None @@ -338,8 +345,6 @@ def init(cache_db, **kwargs): cloud_project_id = None if is_cloud: - import requests - from recce.util.recce_cloud import RecceCloud, RecceCloudException cloud_token = kwargs.get("cloud_token") or kwargs.get("api_token") @@ -359,7 +364,11 @@ def init(cache_db, **kwargs): console.print(f"[bold]Cloud mode[/bold]: session {session_id}") # Get session info - session_info = cloud_client.get_session(session_id) + try: + session_info = cloud_client.get_session(session_id) + except RecceCloudException as e: + console.print(f"[[red]Error[/red]] Failed to get session: {e}") + exit(1) if session_info.get("status") == "error": console.print(f"[[red]Error[/red]] Failed to get session: {session_info.get('message', 'Access denied')}") exit(1) @@ -387,12 +396,17 @@ def init(cache_db, **kwargs): for artifact_key, filename in [("manifest_url", "manifest.json"), ("catalog_url", "catalog.json")]: url = download_urls.get(artifact_key) if url: - resp = requests.get(url) - if resp.status_code == 200: - (target_path / filename).write_bytes(resp.content) - console.print(f" Downloaded {filename} to {target_path}") - else: - console.print(f" [[yellow]Warning[/yellow]] Failed to download {filename}: HTTP {resp.status_code}") + try: + resp = requests.get(url, timeout=_METADATA_TIMEOUT) + if resp.status_code == 200: + (target_path / filename).write_bytes(resp.content) + console.print(f" Downloaded {filename} to {target_path}") + else: + console.print( + f" [[yellow]Warning[/yellow]] Failed to download {filename}: HTTP {resp.status_code}" + ) + except requests.RequestException as e: + console.print(f" [[yellow]Warning[/yellow]] Failed to download {filename}: {e}") # Download base session artifacts try: @@ -405,41 +419,69 @@ def init(cache_db, **kwargs): for artifact_key, filename in [("manifest_url", "manifest.json"), ("catalog_url", "catalog.json")]: url = base_download_urls.get(artifact_key) if url: - resp = requests.get(url) - if resp.status_code == 200: - (target_base_path / filename).write_bytes(resp.content) - console.print(f" Downloaded base {filename} to {target_base_path}") - else: - console.print( - f" [[yellow]Warning[/yellow]] Failed to download base {filename}: HTTP {resp.status_code}" - ) + try: + resp = requests.get(url, timeout=_METADATA_TIMEOUT) + if resp.status_code == 200: + (target_base_path / filename).write_bytes(resp.content) + console.print(f" Downloaded base {filename} to {target_base_path}") + else: + console.print( + f" [[yellow]Warning[/yellow]] Failed to download base {filename}: HTTP {resp.status_code}" + ) + except requests.RequestException as e: + console.print(f" [[yellow]Warning[/yellow]] Failed to download base {filename}: {e}") # Download existing CLL cache for warm start. # Try current session first, then fall back to production (base) session. + # Use streaming to avoid loading large cache files entirely into memory. if cache_db is None: cache_db = _DEFAULT_DB_PATH Path(cache_db).parent.mkdir(parents=True, exist_ok=True) + def _stream_download_to_file(url: str, dest: Path) -> int: + """Stream a URL to a file, returning bytes written. Raises on failure.""" + resp = requests.get(url, stream=True, timeout=_DOWNLOAD_TIMEOUT) + if resp.status_code != 200: + return 0 + total = 0 + with tempfile.NamedTemporaryFile(dir=dest.parent, delete=False, suffix=".tmp") as tmp: + tmp_path = Path(tmp.name) + try: + for chunk in resp.iter_content(chunk_size=8192): + tmp.write(chunk) + total += len(chunk) + tmp.flush() + except Exception: + tmp_path.unlink(missing_ok=True) + raise + if total > 0: + tmp_path.rename(dest) + else: + tmp_path.unlink(missing_ok=True) + return total + cache_downloaded = False cll_cache_url = download_urls.get("cll_cache_url") if cll_cache_url: - resp = requests.get(cll_cache_url) - if resp.status_code == 200 and len(resp.content) > 0: - Path(cache_db).write_bytes(resp.content) - console.print(f" Downloaded CLL cache from session ({len(resp.content) / 1024 / 1024:.1f} MB)") - cache_downloaded = True + try: + nbytes = _stream_download_to_file(cll_cache_url, Path(cache_db)) + if nbytes > 0: + console.print(f" Downloaded CLL cache from session ({nbytes / 1024 / 1024:.1f} MB)") + cache_downloaded = True + except requests.RequestException as e: + console.print(f" [[yellow]Warning[/yellow]] Failed to download CLL cache: {e}") if not cache_downloaded: # Fall back to production (base) session cache base_cache_url = base_download_urls.get("cll_cache_url") if base_cache_url: - resp = requests.get(base_cache_url) - if resp.status_code == 200 and len(resp.content) > 0: - Path(cache_db).write_bytes(resp.content) - console.print( - f" Downloaded CLL cache from base session ({len(resp.content) / 1024 / 1024:.1f} MB)" - ) - cache_downloaded = True + try: + nbytes = _stream_download_to_file(base_cache_url, Path(cache_db)) + if nbytes > 0: + console.print(f" Downloaded CLL cache from base session ({nbytes / 1024 / 1024:.1f} MB)") + cache_downloaded = True + except requests.RequestException as e: + console.print(f" [[yellow]Warning[/yellow]] Failed to download base CLL cache: {e}") if not cache_downloaded: console.print(" [dim]No existing CLL cache found — will compute from scratch[/dim]") @@ -601,12 +643,19 @@ def init(cache_db, **kwargs): # The per-node SQLite cache is warm from the loop above, so this is fast. console.print("\n[bold]Building full CLL map...[/bold]") t_map_start = time.perf_counter() + cll_map_path = Path(cache_db).parent / "cll_map.json" try: full_cll_map = dbt_adapter.build_full_cll_map() - cll_map_path = Path(cache_db).parent / "cll_map.json" cll_map_data = full_cll_map.model_dump(mode="json") - with open(cll_map_path, "w") as f: - json.dump(cll_map_data, f) + # Write to temp file first to avoid corrupted JSON on partial write + tmp_fd, tmp_name = tempfile.mkstemp(dir=cll_map_path.parent, suffix=".tmp") + try: + with os.fdopen(tmp_fd, "w") as f: + json.dump(cll_map_data, f) + Path(tmp_name).rename(cll_map_path) + except Exception: + Path(tmp_name).unlink(missing_ok=True) + raise map_elapsed = time.perf_counter() - t_map_start map_size_mb = cll_map_path.stat().st_size / 1024 / 1024 console.print( @@ -624,37 +673,67 @@ def init(cache_db, **kwargs): # Upload results to Cloud if in cloud mode if is_cloud and cloud_client: console.print("\n[bold]Uploading results to Cloud...[/bold]") + upload_failures = [] try: upload_urls = cloud_client.get_upload_urls_by_session_id(cloud_org_id, cloud_project_id, session_id) # Upload CLL map - cll_map_path = Path(cache_db).parent / "cll_map.json" cll_map_upload_url = upload_urls.get("cll_map_url") if cll_map_upload_url and cll_map_path.is_file(): - with open(cll_map_path, "rb") as f: - resp = requests.put(cll_map_upload_url, data=f, headers={"Content-Type": "application/json"}) - if resp.status_code in (200, 204): - console.print(f" Uploaded cll_map.json ({cll_map_path.stat().st_size / 1024 / 1024:.1f} MB)") - else: - console.print(f" [[yellow]Warning[/yellow]] Failed to upload cll_map.json: HTTP {resp.status_code}") + try: + with open(cll_map_path, "rb") as f: + resp = requests.put( + cll_map_upload_url, + data=f, + headers={"Content-Type": "application/json"}, + timeout=_UPLOAD_TIMEOUT, + ) + if resp.status_code in (200, 204): + console.print(f" Uploaded cll_map.json ({cll_map_path.stat().st_size / 1024 / 1024:.1f} MB)") + else: + upload_failures.append("cll_map.json") + console.print( + f" [[yellow]Warning[/yellow]] Failed to upload cll_map.json: HTTP {resp.status_code}" + ) + except requests.RequestException as e: + upload_failures.append("cll_map.json") + console.print(f" [[yellow]Warning[/yellow]] Failed to upload cll_map.json: {e}") elif not cll_map_upload_url: - console.print(" [[yellow]Warning[/yellow]] No cll_map_url in upload URLs (Cloud server may need update)") + console.print( + " [[yellow]Warning[/yellow]] No cll_map_url in upload URLs (Cloud server may need update)" + ) # Upload CLL cache cll_cache_upload_url = upload_urls.get("cll_cache_url") if cll_cache_upload_url and Path(cache_db).is_file(): - with open(cache_db, "rb") as f: - resp = requests.put( - cll_cache_upload_url, data=f, headers={"Content-Type": "application/octet-stream"} - ) - if resp.status_code in (200, 204): - console.print(f" Uploaded cll_cache.db ({Path(cache_db).stat().st_size / 1024 / 1024:.1f} MB)") - else: - console.print(f" [[yellow]Warning[/yellow]] Failed to upload cll_cache.db: HTTP {resp.status_code}") + try: + with open(cache_db, "rb") as f: + resp = requests.put( + cll_cache_upload_url, + data=f, + headers={"Content-Type": "application/octet-stream"}, + timeout=_UPLOAD_TIMEOUT, + ) + if resp.status_code in (200, 204): + console.print(f" Uploaded cll_cache.db ({Path(cache_db).stat().st_size / 1024 / 1024:.1f} MB)") + else: + upload_failures.append("cll_cache.db") + console.print( + f" [[yellow]Warning[/yellow]] Failed to upload cll_cache.db: HTTP {resp.status_code}" + ) + except requests.RequestException as e: + upload_failures.append("cll_cache.db") + console.print(f" [[yellow]Warning[/yellow]] Failed to upload cll_cache.db: {e}") elif not cll_cache_upload_url: logger.debug("No cll_cache_url in upload URLs — cache upload not supported yet") - console.print("[bold green]Cloud upload complete.[/bold green]") + if upload_failures: + console.print( + f"[bold yellow]Cloud upload completed with warnings[/bold yellow] " + f"(failed: {', '.join(upload_failures)})" + ) + else: + console.print("[bold green]Cloud upload complete.[/bold green]") except Exception as e: logger.warning("[recce init] Cloud upload failed: %s", e) console.print(f" [[yellow]Warning[/yellow]] Cloud upload failed: {e}") diff --git a/tests/test_cli_cache.py b/tests/test_cli_cache.py index 0519a053c..b1f534b34 100644 --- a/tests/test_cli_cache.py +++ b/tests/test_cli_cache.py @@ -1,4 +1,6 @@ import os +import sqlite3 +import tempfile from pathlib import Path from unittest.mock import MagicMock, patch @@ -554,11 +556,39 @@ def _make_mock_cloud_client( return client +def _make_valid_cache_db_bytes() -> bytes: + """Create a valid SQLite CLL cache database as bytes for testing warm-start downloads.""" + tmp_fd, tmp_name = tempfile.mkstemp(suffix=".db") + os.close(tmp_fd) + try: + conn = sqlite3.connect(tmp_name) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute( + "CREATE TABLE IF NOT EXISTS cll_cache " + "(node_id TEXT, content_key TEXT, cll_json TEXT, last_used_at REAL, PRIMARY KEY (node_id))" + ) + conn.commit() + conn.close() + return Path(tmp_name).read_bytes() + finally: + Path(tmp_name).unlink(missing_ok=True) + # Also clean up WAL/SHM files + Path(tmp_name + "-wal").unlink(missing_ok=True) + Path(tmp_name + "-shm").unlink(missing_ok=True) + + def _make_mock_response(status_code: int = 200, content: bytes = b"{}"): - """Create a mock requests.Response.""" + """Create a mock requests.Response with streaming support.""" resp = MagicMock() resp.status_code = status_code resp.content = content + + def _iter_content(chunk_size=8192): + if content: + for i in range(0, len(content), chunk_size): + yield content[i : i + chunk_size] + + resp.iter_content = _iter_content return resp @@ -572,9 +602,12 @@ def test_init_cloud_missing_token(self, runner, tmp_path, tmp_db): [ "init", "--cloud", - "--session-id", "sess-1", - "--cache-db", tmp_db, - "--project-dir", str(tmp_path), + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), ], ) assert result.exit_code == 1 @@ -587,9 +620,12 @@ def test_init_cloud_missing_session_id(self, runner, tmp_path, tmp_db): [ "init", "--cloud", - "--cloud-token", "ghp_testtoken", - "--cache-db", tmp_db, - "--project-dir", str(tmp_path), + "--cloud-token", + "ghp_testtoken", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), ], ) assert result.exit_code == 1 @@ -609,10 +645,14 @@ def test_init_cloud_session_error(self, mock_cloud_cls, runner, tmp_path, tmp_db [ "init", "--cloud", - "--cloud-token", "ghp_testtoken", - "--session-id", "sess-1", - "--cache-db", tmp_db, - "--project-dir", str(tmp_path), + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), ], ) assert result.exit_code == 1 @@ -632,10 +672,14 @@ def test_init_cloud_missing_org_project(self, mock_cloud_cls, runner, tmp_path, [ "init", "--cloud", - "--cloud-token", "ghp_testtoken", - "--session-id", "sess-1", - "--cache-db", tmp_db, - "--project-dir", str(tmp_path), + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), ], ) assert result.exit_code == 1 @@ -709,10 +753,14 @@ def mock_put(url, **kwargs): [ "init", "--cloud", - "--cloud-token", "ghp_testtoken", - "--session-id", "sess-1", - "--cache-db", tmp_db, - "--project-dir", str(tmp_path), + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), ], catch_exceptions=False, ) @@ -764,10 +812,14 @@ def mock_get(url, **kwargs): [ "init", "--cloud", - "--cloud-token", "ghp_testtoken", - "--session-id", "sess-1", - "--cache-db", tmp_db, - "--project-dir", str(tmp_path), + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), ], catch_exceptions=False, ) @@ -829,10 +881,14 @@ def mock_put(url, **kwargs): [ "init", "--cloud", - "--cloud-token", "ghp_testtoken", - "--session-id", "sess-1", - "--cache-db", tmp_db, - "--project-dir", str(tmp_path), + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), ], catch_exceptions=False, ) @@ -888,10 +944,14 @@ def mock_put(url, **kwargs): [ "init", "--cloud", - "--cloud-token", "ghp_testtoken", - "--session-id", "sess-1", - "--cache-db", tmp_db, - "--project-dir", str(tmp_path), + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), ], catch_exceptions=False, ) @@ -899,6 +959,373 @@ def mock_put(url, **kwargs): assert result.exit_code == 0 assert "Failed to build CLL map" in result.output + def test_init_cloud_get_session_raises_recce_cloud_exception(self, runner, tmp_path, tmp_db): + """get_session raising RecceCloudException (non-403 HTTP errors) should exit 1 cleanly.""" + from recce.util.recce_cloud import RecceCloudException + + mock_client = MagicMock() + mock_client.get_session.side_effect = RecceCloudException( + message="Failed to get session from Recce Cloud.", + reason="Internal Server Error", + status_code=500, + ) + + with patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + ) + assert result.exit_code == 1 + assert "Failed to get session" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_warm_cache_from_current_session(self, mock_load_context, runner, tmp_path, tmp_db): + """When current session has a CLL cache, it should be downloaded and used (no base fallback).""" + manifest_bytes = b'{"nodes": {}}' + cache_content = _make_valid_cache_db_bytes() + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + "cll_cache_url": "https://s3.example.com/cll_cache.db", + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + "cll_cache_url": "https://s3.example.com/base-cll_cache.db", + }, + ) + + def mock_get(url, **kwargs): + if "base" not in url and "cll_cache" in url: + return _make_mock_response(200, cache_content) + elif "manifest" in url: + return _make_mock_response(200, manifest_bytes) + return _make_mock_response(404) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + adapter.get_cll_cached.return_value = MagicMock() + mock_cll_map = MagicMock() + mock_cll_map.nodes = {} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=lambda *a, **kw: _make_mock_response(200)), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Downloaded CLL cache from session" in result.output + # Should NOT fall back to base + assert "Downloaded CLL cache from base session" not in result.output + + @patch("recce.core.load_context") + def test_init_cloud_warm_cache_fallback_to_base(self, mock_load_context, runner, tmp_path, tmp_db): + """When current session has no cache, base session cache should be used.""" + manifest_bytes = b'{"nodes": {}}' + base_cache_content = _make_valid_cache_db_bytes() + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + # No cll_cache_url for current session + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + "cll_cache_url": "https://s3.example.com/base-cll_cache.db", + }, + ) + + def mock_get(url, **kwargs): + if "base-cll_cache" in url: + return _make_mock_response(200, base_cache_content) + elif "manifest" in url: + return _make_mock_response(200, manifest_bytes) + return _make_mock_response(404) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + adapter.get_cll_cached.return_value = MagicMock() + mock_cll_map = MagicMock() + mock_cll_map.nodes = {} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=lambda *a, **kw: _make_mock_response(200)), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "Downloaded CLL cache from base session" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_no_cache_computes_from_scratch(self, mock_load_context, runner, tmp_path, tmp_db): + """When neither session has a cache, the message should say computing from scratch.""" + manifest_bytes = b'{"nodes": {}}' + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + # No cll_cache_url + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + # No cll_cache_url + }, + ) + + def mock_get(url, **kwargs): + if "manifest" in url: + return _make_mock_response(200, manifest_bytes) + return _make_mock_response(404) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + adapter.get_cll_cached.return_value = MagicMock() + mock_cll_map = MagicMock() + mock_cll_map.nodes = {} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=lambda *a, **kw: _make_mock_response(200)), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "No existing CLL cache found" in result.output + + def test_init_cloud_session_id_from_env_var(self, runner, tmp_path, tmp_db): + """RECCE_SESSION_ID env var should be accepted in place of --session-id.""" + mock_client = _make_mock_cloud_client( + session_info={"org_id": None, "project_id": None, "status": "active"}, + ) + + with patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + # No --session-id flag — use env var instead + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + env={"RECCE_SESSION_ID": "sess-from-env"}, + ) + # Should get past the "requires --session-id" check and reach session validation + assert result.exit_code == 1 + assert "missing org_id or project_id" in result.output + + @patch("recce.core.load_context") + def test_init_cloud_upload_partial_failure_shows_warning(self, mock_load_context, runner, tmp_path, tmp_db): + """If one upload succeeds and one fails, message should indicate partial failure.""" + manifest_bytes = b'{"nodes": {}}' + + mock_client = _make_mock_cloud_client( + download_urls={ + "manifest_url": "https://s3.example.com/manifest.json", + }, + base_download_urls={ + "manifest_url": "https://s3.example.com/base-manifest.json", + }, + upload_urls={ + "cll_map_url": "https://s3.example.com/upload/cll_map.json", + "cll_cache_url": "https://s3.example.com/upload/cll_cache.db", + }, + ) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + adapter.get_cll_cached.return_value = MagicMock() + mock_cll_map = MagicMock() + mock_cll_map.nodes = {"model.test.a": MagicMock()} + mock_cll_map.columns = {} + mock_cll_map.model_dump.return_value = {"nodes": {}, "columns": {}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + def mock_get(url, **kwargs): + return _make_mock_response(200, manifest_bytes) + + # CLL map upload succeeds, cache upload fails + def mock_put(url, **kwargs): + if "cll_map" in url: + return _make_mock_response(200) + return _make_mock_response(500) + + with ( + patch("recce.util.recce_cloud.RecceCloud", return_value=mock_client), + patch("requests.get", side_effect=mock_get), + patch("requests.put", side_effect=mock_put), + patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ), + ): + result = runner.invoke( + cli, + [ + "init", + "--cloud", + "--cloud-token", + "ghp_testtoken", + "--session-id", + "sess-1", + "--cache-db", + tmp_db, + "--project-dir", + str(tmp_path), + ], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + assert "completed with warnings" in result.output + assert "cll_cache.db" in result.output + + +class TestInitLocalCllMap: + """Tests for cll_map.json generation in non-cloud (local) init.""" + + @patch("recce.core.load_context") + def test_init_local_creates_cll_map_json(self, mock_load_context, runner, tmp_path, tmp_db): + """recce init (local) should create cll_map.json next to the cache db.""" + import json + + _setup_target_dir(tmp_path) + + nodes = {"model.test.a": _make_mock_node(raw_code="SELECT a FROM src")} + adapter = _make_mock_adapter(nodes) + mock_cll = MagicMock() + adapter.get_cll_cached.return_value = mock_cll + + mock_cll_map = MagicMock() + mock_cll_map.nodes = {"model.test.a": MagicMock()} + mock_cll_map.columns = {"col_a": MagicMock()} + mock_cll_map.model_dump.return_value = {"nodes": {"model.test.a": {}}, "columns": {"col_a": {}}} + adapter.build_full_cll_map.return_value = mock_cll_map + + mock_ctx = MagicMock() + mock_ctx.adapter = adapter + mock_load_context.return_value = mock_ctx + + with patch( + "recce.adapter.dbt_adapter.DbtAdapter._serialize_cll_data", + return_value='{"nodes":{}, "columns":{}, "parent_map":{}}', + ): + result = runner.invoke( + cli, + ["init", "--cache-db", tmp_db, "--project-dir", str(tmp_path)], + catch_exceptions=False, + ) + + assert result.exit_code == 0 + cll_map_path = Path(tmp_db).parent / "cll_map.json" + assert cll_map_path.is_file(), "cll_map.json should be created next to cache db" + data = json.loads(cll_map_path.read_text()) + assert "nodes" in data + assert "columns" in data + assert "CLL map saved to" in result.output + class TestServerCllCacheFlag: def test_enable_cll_cache_activates_sqlite_cache(self, tmp_db):