Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 89 additions & 42 deletions recce/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,24 @@
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool

# DRC-3634: submit_run is the run-persistence entry point shared with the
# recce server's run_router. Local-mode ad-hoc diff tools import it to
# persist Runs to the in-process RecceContext so the state exported to S3
# at MCP-server shutdown carries the analysis runs.
from recce.apis.run_func import submit_run as _submit_run_fn # noqa: E402
from recce.core import RecceContext, load_context
from recce.exceptions import RecceException
from recce.server import RecceServerMode
from recce.tasks.dataframe import DataFrame
from recce.tasks.histogram import HistogramDiffTask
from recce.tasks.profile import ProfileDiffTask
from recce.tasks.query import QueryDiffTask, QueryTask
from recce.tasks.rowcount import (
PERMISSION_DENIED_INDICATORS,
SYNTAX_ERROR_INDICATORS,
TABLE_NOT_FOUND_INDICATORS,
RowCountDiffTask,
)
from recce.tasks.top_k import TopKDiffTask
from recce.tasks.valuediff import ValueDiffDetailTask, ValueDiffTask
from recce.tasks.valuediff import ValueDiffDetailTask
from recce.util.recce_cloud import RECCE_CLOUD_API_HOST, RecceCloudException

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -178,7 +181,17 @@ async def _tool_run_backed(self, run_type: str, params: Dict[str, Any]) -> Dict[
"runs",
json={"type": run_type, "params": params, "nowait": False},
)
return run.get("result", run)
result = run.get("result", run)
# DRC-3532: surface run_id alongside the result so the summary agent can
# cite the exact run inline via {{run:<run_id>}} markers. Additive
# (option i): existing result fields are preserved. Only added when the
# response actually carries a run_id and the result is a dict, so
# run-less responses and non-dict results are left untouched (the agent
# must never synthesize a run_id it was not given).
run_id = run.get("run_id")
if run_id is not None and isinstance(result, dict):
result = {**result, "run_id": str(run_id)}
return result

async def _tool_list_checks(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
checks = await self._request("GET", "checks")
Expand Down Expand Up @@ -558,6 +571,58 @@ def _maybe_add_single_env_warning(self, result: Dict[str, Any]) -> Dict[str, Any
return {**result, "_warning": SINGLE_ENV_WARNING}
return result

async def _tool_run_backed_local(self, run_type: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a run-backed diff tool in local mode (DRC-3634).

When an in-process ``RecceContext`` is available as ``default_context()``,
this method mirrors CloudBackend._tool_run_backed (commit 98670f9e): it
uses the same ``submit_run`` machinery as the recce server's run_router
so that the Run is persisted to ``default_context().runs`` and therefore
included in the S3-exported state the infra comment-builder reads. The
returned dict carries ``run_id`` alongside the diff result.

When ``default_context()`` is None (e.g. the cloud-backend SSE path that
routes through the ``CloudBackend`` class instead), this method falls back
to a plain in-executor task execution and returns the result without a
``run_id`` β€” identical to the pre-DRC-3634 behaviour.

The Run executes **once** via ``submit_run``'s thread-pool executor β€”
there is no double execution.
"""
from recce.core import default_context

if default_context() is None:
# Fallback path: no in-process context, execute task directly.
from recce.apis.run_func import create_task
from recce.models.types import RunType

try:
run_type_enum = RunType(run_type)
task = create_task(run_type_enum, params)
except (ValueError, NotImplementedError):
raise ValueError(f"Run type '{run_type}' not supported in local mode")
raw_result = await asyncio.get_event_loop().run_in_executor(None, task.execute)
if hasattr(raw_result, "model_dump"):
return raw_result.model_dump(mode="json")
return raw_result if isinstance(raw_result, dict) else {}

run, future = _submit_run_fn(run_type, params, triggered_by="recce_ai")
# Await the executor future so run.result is populated before we return.
await asyncio.wrap_future(future)
# Normalise run.result to a plain dict (tasks may return Pydantic models).
raw_result = run.result
if raw_result is None:
result: Dict[str, Any] = {}
elif isinstance(raw_result, dict):
result = raw_result
elif hasattr(raw_result, "model_dump"):
result = raw_result.model_dump(mode="json")
else:
result = {}
# Surface run_id alongside the result β€” matching CloudBackend shape:
# ``result = {**result, "run_id": str(run_id)}`` (commit 98670f9e).
return {**result, "run_id": str(run.run_id)}

def _setup_handlers(self):
"""Register all tool handlers"""

Expand Down Expand Up @@ -1596,60 +1661,42 @@ async def _tool_schema_diff(self, arguments: Dict[str, Any]) -> Dict[str, Any]:

async def _tool_row_count_diff(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute row count diff task"""
task = RowCountDiffTask(params=arguments)

# Offload sync task to thread pool to avoid blocking the event loop
result = await asyncio.get_event_loop().run_in_executor(None, task.execute)

# DRC-3634: route through _tool_run_backed_local so the Run is persisted
# to the in-process context and the result carries run_id.
result = await self._tool_run_backed_local("row_count_diff", arguments)
return self._maybe_add_single_env_warning(result)

async def _tool_query(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a query"""
sql_template = arguments.get("sql_template")
# DRC-3634: route through _tool_run_backed_local so the Run is persisted
# to the in-process context and the result carries run_id.
# Map the `base` flag to the run_type used by the server run_router:
# QueryTask (is_base=False) β†’ "query"; QueryBaseTask (is_base=True) β†’ "query_base".
is_base = arguments.get("base", False)

params = {"sql_template": sql_template}
task = QueryTask(params=params)
task.is_base = is_base

# Execute task
result = await asyncio.get_event_loop().run_in_executor(None, task.execute)

# Convert to dict if it's a model
if hasattr(result, "model_dump"):
return result.model_dump(mode="json")
return result
run_type = "query_base" if is_base else "query"
# params for QueryTask / QueryBaseTask accept {"sql_template": ...} only.
params = {"sql_template": arguments.get("sql_template")}
return await self._tool_run_backed_local(run_type, params)

async def _tool_query_diff(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute query diff task"""
task = QueryDiffTask(params=arguments)

# Execute task
result = await asyncio.get_event_loop().run_in_executor(None, task.execute)

# Convert to dict if it's a model
if hasattr(result, "model_dump"):
result = result.model_dump(mode="json")
# DRC-3634: route through _tool_run_backed_local so the Run is persisted
# to the in-process context and the result carries run_id.
result = await self._tool_run_backed_local("query_diff", arguments)
return self._maybe_add_single_env_warning(result)

async def _tool_profile_diff(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute profile diff task"""
task = ProfileDiffTask(params=arguments)

# Execute task
result = await asyncio.get_event_loop().run_in_executor(None, task.execute)

# Convert to dict if it's a model
if hasattr(result, "model_dump"):
result = result.model_dump(mode="json")
# DRC-3634: route through _tool_run_backed_local so the Run is persisted
# to the in-process context and the result carries run_id.
result = await self._tool_run_backed_local("profile_diff", arguments)
return self._maybe_add_single_env_warning(result)

async def _tool_value_diff(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute value diff task"""
task = ValueDiffTask(params=arguments)
result = await asyncio.get_event_loop().run_in_executor(None, task.execute)
if hasattr(result, "model_dump"):
result = result.model_dump(mode="json")
# DRC-3634: route through _tool_run_backed_local so the Run is persisted
# to the in-process context and the result carries run_id.
result = await self._tool_run_backed_local("value_diff", arguments)
return self._maybe_add_single_env_warning(result)

async def _tool_value_diff_detail(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
18 changes: 14 additions & 4 deletions recce/tasks/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,15 @@ def execute(self):
curr_columns = [column for column in dbt_adapter.get_columns(model, base=False)]

if selected_columns:
# Only profile the columns in the filter_columns
base_columns = [column for column in base_columns if column.name in selected_columns]
curr_columns = [column for column in curr_columns if column.name in selected_columns]
# Only profile the columns in the filter_columns.
# Match case-insensitively: get_columns() returns physical catalog names,
# which case-folding warehouses (e.g. Snowflake) store UPPERCASE while
# callers supply lowercase manifest-convention names. A case-sensitive
# `column.name in selected_columns` test would drop every column and yield
# an empty profile. (DRC-3674)
requested = {name.lower() for name in selected_columns}
base_columns = [column for column in base_columns if column.name.lower() in requested]
curr_columns = [column for column in curr_columns if column.name.lower() in requested]

total = len(base_columns) + len(curr_columns)
completed = 0
Expand Down Expand Up @@ -288,7 +294,11 @@ def execute(self):
curr_columns = [column for column in dbt_adapter.get_columns(model, base=False)]

if selected_columns:
curr_columns = [column for column in curr_columns if column.name in selected_columns]
# Case-insensitive match β€” see ProfileDiffTask.execute (DRC-3674): physical
# catalog names are UPPERCASE on case-folding warehouses while callers supply
# lowercase names, so an exact-case filter would drop every column.
requested = {name.lower() for name in selected_columns}
curr_columns = [column for column in curr_columns if column.name.lower() in requested]

total = len(curr_columns)
completed = 0
Expand Down
78 changes: 78 additions & 0 deletions tests/tasks/test_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,84 @@ def validate(params: dict = {}, view_options: dict = {}):
validate({})


# =============================================================================
# Snowflake column-case regression tests (DRC-3674)
#
# Root cause: profile_diff / profile filtered the requested columns with a
# case-sensitive membership test (``column.name in selected_columns``).
# get_columns() returns physical catalog names, which case-folding warehouses
# (e.g. Snowflake) store UPPERCASE, while the cloud summary agent supplies
# lowercase manifest-convention names. The exact-case filter dropped EVERY
# column, so the per-column profiling loop never ran and the result came back
# completely empty:
# {"base":{"columns":[],"data":[]},"current":{"columns":[],"data":[]}}
#
# DuckDB preserves the CSV-header casing, so an UPPERCASE header reproduces the
# Snowflake physical-name casing end-to-end (no mocking needed).
#
# Fix: lowercase both sides of the membership test before filtering, so
# lowercase input resolves to the physical (uppercase) columns and yields the
# same non-empty profile as uppercase input. No-op on already-lowercase
# duckdb models and on quoted/case-sensitive identifiers (the SQL still profiles
# via the physical column.name).
# =============================================================================

csv_data_uppercase = """
CUSTOMER_ID,CUSTOMER_LIFETIME_VALUE,NET_CUSTOMER_LIFETIME_VALUE
1,100,90
2,200,180
3,300,270
"""


def test_profile_diff_lowercase_columns_on_uppercase_physical(dbt_test_helper):
"""DRC-3674: lowercase requested columns must resolve against UPPERCASE physical names.

Pre-fix: lowercase input yields 0 columns / 0 rows on both sides (empty profile).
Post-fix: lowercase input returns the SAME non-empty profile as uppercase input.
"""
dbt_test_helper.create_model("snowflake_customers", csv_data_uppercase, csv_data_uppercase)

lower = ProfileDiffTask(
dict(model="snowflake_customers", columns=["customer_lifetime_value", "net_customer_lifetime_value"])
).execute()
upper = ProfileDiffTask(
dict(model="snowflake_customers", columns=["CUSTOMER_LIFETIME_VALUE", "NET_CUSTOMER_LIFETIME_VALUE"])
).execute()

# Lowercase must not be empty (the production bug).
assert len(lower.base.data) == 2
assert len(lower.current.data) == 2

# Lowercase and uppercase requests must be identical.
assert len(lower.base.data) == len(upper.base.data)
assert len(lower.current.data) == len(upper.current.data)

# Profiled the physical (uppercase) columns, not the lowercase request strings.
profiled = {row[0] for row in lower.current.data}
assert profiled == {"CUSTOMER_LIFETIME_VALUE", "NET_CUSTOMER_LIFETIME_VALUE"}


def test_profile_lowercase_columns_on_uppercase_physical(dbt_test_helper):
"""DRC-3674: single-sided ProfileTask must also resolve lowercase β†’ physical names."""
dbt_test_helper.create_model("snowflake_customers", None, csv_data_uppercase)

lower = ProfileTask(dict(model="snowflake_customers", columns=["customer_lifetime_value"])).execute()
upper = ProfileTask(dict(model="snowflake_customers", columns=["CUSTOMER_LIFETIME_VALUE"])).execute()

assert len(lower.current.data) == 1
assert len(lower.current.data) == len(upper.current.data)
assert lower.current.data[0][0] == "CUSTOMER_LIFETIME_VALUE"


def test_profile_diff_lowercase_is_noop_on_lowercase_physical(dbt_test_helper):
"""Adapter-safety: case-insensitive matching is a no-op when physical names are already lowercase (duckdb)."""
dbt_test_helper.create_model("customers", csv_data_base, csv_data_curr)
run_result = ProfileDiffTask(dict(model="customers", columns=["name", "age"])).execute()
assert len(run_result.current.data) == 2
assert len(run_result.base.data) == 2


def test_profile_column_jinja_template():

class DummyAdapter:
Expand Down
44 changes: 43 additions & 1 deletion tests/test_mcp_cloud_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,47 @@ async def test_run_check_auto_approve_failure_does_not_mask_run_result(cloud_req
assert result["result"] == {"ok": True}


@pytest.mark.asyncio
async def test_run_backed_tool_surfaces_run_id_in_result(cloud_requests):
# DRC-3532: run-backed analysis tools (row_count_diff, profile_diff,
# value_diff, query, ...) must surface the run_id so the summary agent can
# cite the exact run via {{run:<run_id>}} markers. Option (i): run_id is
# merged into the result dict (additive β€” existing result fields preserved).
cloud_requests.side_effect = [
MockResponse(204),
MockResponse(
200,
{
"run_id": "run-xyz",
"result": {"results": [{"node_id": "model.pkg.orders", "base": 100, "current": 105}]},
},
),
]
backend = await CloudBackend.create(session_id="sess-123", api_token="token-abc")

result = await backend.call_tool("row_count_diff", {"node_names": ["orders"]})

assert result["run_id"] == "run-xyz"
# Existing result fields are preserved (additive, not a wrapper).
assert result["results"][0]["base"] == 100


@pytest.mark.asyncio
async def test_run_backed_tool_without_run_id_is_unchanged(cloud_requests):
# A run-backed response missing run_id leaves the result untouched (no new
# key invented) β€” anti-fabrication: never synthesize a run_id.
cloud_requests.side_effect = [
MockResponse(204),
MockResponse(200, {"result": {"results": [{"node_id": "model.pkg.orders"}]}}),
]
backend = await CloudBackend.create(session_id="sess-123", api_token="token-abc")

result = await backend.call_tool("row_count_diff", {"node_names": ["orders"]})

assert "run_id" not in result
assert result["results"][0]["node_id"] == "model.pkg.orders"


@pytest.mark.asyncio
async def test_create_check_runs_lineage_diff_via_checks_run_endpoint(cloud_requests):
cloud_requests.side_effect = [
Expand Down Expand Up @@ -747,7 +788,8 @@ async def test_cloud_backend_routes_run_tool_types_through_run_backed(cloud_requ

result = await backend.call_tool("row_count_diff", {"node_names": ["customers"]})

assert result == {"customers": {"base": 1, "curr": 2}}
# run_id is now surfaced alongside the result (DRC-3532, additive).
assert result == {"customers": {"base": 1, "curr": 2}, "run_id": "run-1"}
runs_call = cloud_requests.call_args_list[1]
assert runs_call.args[1].endswith("/runs")
assert runs_call.kwargs["json"]["type"] == "row_count_diff"
Expand Down
Loading
Loading