From e09917097fa2cb412d71c79aca871f8e80931070 Mon Sep 17 00:00:00 2001 From: prasannakumar-tavro Date: Thu, 4 Jun 2026 13:09:58 +0530 Subject: [PATCH 1/7] Added the master tables for table and colum data added the tenant_id support for the tools --- mcp_server/server.py | 39 ++- services/upload_processor.py | 142 ++++++++- sql/core/columns.sql | 7 + sql/core/tables.sql | 11 + sql/core/zz_agent_upsert_unique_indexes.sql | 97 ++++++ tavro_api/api/routers/agents.py | 337 +++++++++++++++++++- tavro_app/src/services/agentApi.ts | 4 +- tavro_app/src/services/mcpClient.ts | 2 + tavro_library/agent_library.py | 318 +++++++++++++++++- 9 files changed, 944 insertions(+), 13 deletions(-) create mode 100644 sql/core/columns.sql create mode 100644 sql/core/tables.sql diff --git a/mcp_server/server.py b/mcp_server/server.py index 05fd11c..db871c8 100644 --- a/mcp_server/server.py +++ b/mcp_server/server.py @@ -343,7 +343,17 @@ async def get_agent_catalog(original_prompt: str, start_record: int = 1, record_ return {"error": "INTERNAL_ERROR", "details": str(e)} @core.tool(name="create_agent") -async def create_agent(original_prompt: str, *, agent_name: str, description: str, instruction: str, tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None) -> Dict[str, Any]: +async def create_agent( + original_prompt: str, + *, + agent_name: str, + description: str, + instruction: str, + tools: Optional[List[Dict[str, Any]]] = None, + tables: Optional[List[Dict[str, Any]]] = None, + data_source: Optional[List[Dict[str, Any]]] = None, + knowledge_source: Optional[Dict[str, str]] = None, +) -> Dict[str, Any]: """ Create and register a new AI agent with defined identity, behavior, and optional integrations. @@ -361,6 +371,25 @@ async def create_agent(original_prompt: str, *, agent_name: str, description: st "name": str, "description": str } + A tool may optionally include table metadata it uses: + { + "name": str, + "description": str, + "table": {"name": str, "columns": [str]} + } + + - `tables`: Optional explicit table metadata for the agent or tools: + [ + { + "name": str, + "columns": [str], + "tool_name": str + } + ] + Use `tool_name` when the table belongs to a specific tool. + + - `data_source`: Optional relationship-style metadata using Agent/Tool/Table/Column + source and target object fields. - `knowledge_source`: A reference to an external knowledge source that the agent can use for contextual understanding. If provided, it must follow: @@ -384,7 +413,9 @@ async def create_agent(original_prompt: str, *, agent_name: str, description: st "Revenue Agent") unless the user has explicitly named them or they are confirmed to exist in the catalog context. If the agent coordinates with upstream agents, describe their roles generically (e.g. "upstream analytical agents") rather than fabricating names. - tools (Optional[List[Dict[str, str]]]): Optional list of tool definitions. + tools (Optional[List[Dict[str, Any]]]): Optional list of tool definitions. + tables (Optional[List[Dict[str, Any]]]): Optional table definitions. + data_source (Optional[List[Dict[str, Any]]]): Optional data-source relationships. knowledge_source (Optional[Dict[str, str]]): Optional knowledge source definition. Returns: @@ -403,6 +434,8 @@ async def create_agent(original_prompt: str, *, agent_name: str, description: st "description": description, "instruction": instruction, "tools": tools, + "tables": tables, + "data_source": data_source, "knowledge_source": knowledge_source, }, tenant_id, @@ -413,6 +446,8 @@ async def create_agent(original_prompt: str, *, agent_name: str, description: st description=description, instruction=instruction, tools=tools, + tables=tables, + data_source=data_source, knowledge_source=knowledge_source, tenant_id=tenant_id, ) diff --git a/services/upload_processor.py b/services/upload_processor.py index ee2327a..3c60bb9 100644 --- a/services/upload_processor.py +++ b/services/upload_processor.py @@ -277,7 +277,13 @@ def _upsert_agent_tools(conn, card: dict, agent_internal_id: str, now_str: str): tools = card.get("tool", []) or [] if not has_meaningful_data(tools): return + tenant_id = ident.get("tenant_id") agent_id = ident.get("agent_id") + # If tenant_id wasn't provided on the incoming card, try to read it from the agents table + if tenant_id is None: + rows = _query(conn, f"SELECT tenant_id FROM {CORE}.agents WHERE agent_internal_id = {_sq(agent_internal_id)} LIMIT 1") + if rows and rows[0].get("tenant_id"): + tenant_id = rows[0].get("tenant_id") select_rows = [] for tool in tools: delegation_possible = ( @@ -285,7 +291,7 @@ def _upsert_agent_tools(conn, card: dict, agent_internal_id: str, now_str: str): if tool.get("delegation_possible") is not None else None ) select_rows.append(f""" - SELECT {_sq(agent_internal_id)} AS agent_internal_id, {_sq(agent_id)} AS agent_id, + SELECT {_sq(tenant_id)} AS tenant_id, {_sq(agent_internal_id)} AS agent_internal_id, {_sq(agent_id)} AS agent_id, {_sq(tool.get('identifier'))} AS tool_id, {_sq(tool.get('name'))} AS tool_name, {_sq(tool.get('description'))} AS tool_description, {_bool(delegation_possible)}::boolean AS delegation_possible, @@ -298,18 +304,19 @@ def _upsert_agent_tools(conn, card: dict, agent_internal_id: str, now_str: str): union_all = "\nUNION ALL\n".join(select_rows) _exec(conn, f""" INSERT INTO {CORE}.agent_tools ( - agent_internal_id, agent_id, tool_id, tool_name, tool_description, + tenant_id, agent_internal_id, agent_id, tool_id, tool_name, tool_description, delegation_possible, allowed_delegates, input_schema_json_text, output_schema_json_text, default_config_json_text, created_ts, updated_ts ) - SELECT agent_internal_id, agent_id, tool_id, tool_name, tool_description, + SELECT tenant_id, agent_internal_id, agent_id, tool_id, tool_name, tool_description, delegation_possible, allowed_delegates, input_schema_json_text, output_schema_json_text, default_config_json_text, now_ts, now_ts FROM ({union_all}) AS s ON CONFLICT (agent_internal_id, tool_id) DO UPDATE SET + tenant_id = EXCLUDED.tenant_id, agent_id = EXCLUDED.agent_id, tool_description = EXCLUDED.tool_description, delegation_possible = EXCLUDED.delegation_possible, @@ -321,6 +328,129 @@ def _upsert_agent_tools(conn, card: dict, agent_internal_id: str, now_str: str): """, f"agent_tools upsert ({len(tools)} tools)") +# --------------------------------------------------------------------------- +# Step X — core.tables (agent -> tables, tools -> tables) +# --------------------------------------------------------------------------- +def _upsert_tables(conn, card: dict, agent_internal_id: str, now_str: str): + ident = card.get("identification", {}) + tenant_id = ident.get("tenant_id") + agent_id = ident.get("agent_id") + data_sources = card.get("data_source", []) or [] + if not has_meaningful_data(data_sources): + return + # If tenant_id wasn't provided on the incoming card, try to read it from the agents table + if tenant_id is None: + rows = _query(conn, f"SELECT tenant_id FROM {CORE}.agents WHERE agent_internal_id = {_sq(agent_internal_id)} LIMIT 1") + if rows and rows[0].get("tenant_id"): + tenant_id = rows[0].get("tenant_id") + + # Collect table records: table_id -> {name, tool_id} + tables = {} + # First pass: find explicit table nodes (as source or target) + # Also capture direct agent->table ownership when present. + for ds in data_sources: + src_type = ds.get("source_object_type") + tgt_type = ds.get("target_object_type") + # If source is Table, register it + if str(src_type).lower() == "table": + tid = ds.get("source_object_id") + name = ds.get("source_object_name") + if tid: + tables[tid] = tables.get(tid, {}) + if name: + tables[tid]["name"] = name + # If target is Table, register it + if str(tgt_type).lower() == "table": + tid = ds.get("target_object_id") + name = ds.get("target_object_name") + if tid: + tables[tid] = tables.get(tid, {}) + if name: + tables[tid]["name"] = name + # Direct agent -> table relationship should be stored on the table + if str(src_type).lower() == "agent": + tables[tid]["agent_id"] = ds.get("source_object_id") + + # Second pass: if a Tool -> Table relation exists, capture tool_id and drop agent_id + for ds in data_sources: + src_type = ds.get("source_object_type") + tgt_type = ds.get("target_object_type") + if str(src_type).lower() == "tool" and str(tgt_type).lower() == "table": + tool_id = ds.get("source_object_id") + table_id = ds.get("target_object_id") + if table_id and tool_id and table_id in tables: + tables[table_id]["tool_id"] = tool_id + tables[table_id].pop("agent_id", None) + + if not tables: + return + + select_rows = [] + for tid, meta in tables.items(): + name = meta.get("name") + tool_id = meta.get("tool_id") + table_agent_id = meta.get("agent_id") + select_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(agent_internal_id)} AS agent_internal_id, {_sq(table_agent_id)} AS agent_id, {_sq(tool_id)} AS tool_id, {_sq(tid)} AS table_id, {_sq(name)} AS name, TIMESTAMP '{now_str}' AS now_ts") + + union_all = "\nUNION ALL\n".join(select_rows) + _exec(conn, f""" + INSERT INTO {CORE}.tables ( + tenant_id, agent_internal_id, agent_id, tool_id, table_id, name, created_ts, updated_ts + ) + SELECT tenant_id, agent_internal_id, agent_id, tool_id, table_id, name, now_ts, now_ts + FROM ({union_all}) AS s + ON CONFLICT (agent_internal_id, table_id) + DO UPDATE SET + agent_id = CASE + WHEN EXCLUDED.tool_id IS NOT NULL THEN NULL + ELSE COALESCE(EXCLUDED.agent_id, {CORE}.tables.agent_id) + END, + name = COALESCE(EXCLUDED.name, {CORE}.tables.name), + tool_id = COALESCE(EXCLUDED.tool_id, {CORE}.tables.tool_id), + updated_ts = EXCLUDED.updated_ts + """, f"tables upsert ({len(select_rows)})") + + +# --------------------------------------------------------------------------- +# Step Y — core.columns (tables -> columns) +# --------------------------------------------------------------------------- +def _upsert_columns(conn, card: dict, agent_internal_id: str, now_str: str): + ident = card.get("identification", {}) + tenant_id = ident.get("tenant_id") + data_sources = card.get("data_source", []) or [] + if not has_meaningful_data(data_sources): + return + # If tenant_id wasn't provided on the incoming card, try to read it from the agents table + if tenant_id is None: + rows = _query(conn, f"SELECT tenant_id FROM {CORE}.agents WHERE agent_internal_id = {_sq(agent_internal_id)} LIMIT 1") + if rows and rows[0].get("tenant_id"): + tenant_id = rows[0].get("tenant_id") + + # Columns are represented as relationships where source is Table and target is Column + select_rows = [] + for ds in data_sources: + if str(ds.get("source_object_type")).lower() == "table" and str(ds.get("target_object_type")).lower() == "column": + table_id = ds.get("source_object_id") + col_name = ds.get("target_object_name") + if table_id and col_name: + select_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(table_id)} AS table_id, {_sq(col_name)} AS name, TIMESTAMP '{now_str}' AS now_ts") + + if not select_rows: + return + + union_all = "\nUNION ALL\n".join(select_rows) + _exec(conn, f""" + INSERT INTO {CORE}.columns ( + tenant_id, table_id, name, created_ts, updated_ts + ) + SELECT tenant_id, table_id, name, now_ts, now_ts + FROM ({union_all}) AS s + ON CONFLICT (table_id, name) + DO UPDATE SET + updated_ts = EXCLUDED.updated_ts + """, f"columns upsert ({len(select_rows)})") + + # --------------------------------------------------------------------------- # Step 5 — core.agent_controls # --------------------------------------------------------------------------- @@ -1097,11 +1227,17 @@ def process_card_for_upload(card_dict: dict, tenant_id: Optional[str] = None) -> with _db() as conn: print("[INFO] Step 1/20 - agents") agent_internal_id = _upsert_agent(conn, card_dict, now_str, incoming_source_hash, tenant_id) + # Propagate tenant_id into the card's identification so downstream upserts can read it + if tenant_id is not None: + ident = card_dict.setdefault("identification", {}) + ident["tenant_id"] = tenant_id steps = [ (" 2/20 - agent_configurations", _upsert_agent_configuration), (" 3/20 - agent_identifications", _upsert_agent_identification), (" 4/20 - agent_tools", _upsert_agent_tools), + (" 4.1/20 - tables", _upsert_tables), + (" 4.2/20 - columns", _upsert_columns), (" 5/20 - agent_controls", _upsert_agent_controls), (" 6/20 - agent_knowledge_sources", _upsert_agent_knowledge_source), (" 7/20 - agent_llm_models", _upsert_agent_llm_models), diff --git a/sql/core/columns.sql b/sql/core/columns.sql new file mode 100644 index 0000000..3b1c541 --- /dev/null +++ b/sql/core/columns.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS core.columns ( + tenant_id TEXT, + table_id TEXT, + name TEXT, + created_ts TIMESTAMP, + updated_ts TIMESTAMP +); diff --git a/sql/core/tables.sql b/sql/core/tables.sql new file mode 100644 index 0000000..9ffd715 --- /dev/null +++ b/sql/core/tables.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS core.tables ( + tenant_id TEXT, + agent_internal_id TEXT, + agent_id TEXT, + tool_id TEXT, + table_id TEXT PRIMARY KEY, + name TEXT, + country_of_provenance TEXT, + created_ts TIMESTAMP, + updated_ts TIMESTAMP +); diff --git a/sql/core/zz_agent_upsert_unique_indexes.sql b/sql/core/zz_agent_upsert_unique_indexes.sql index 73d6aa6..09f2ea0 100644 --- a/sql/core/zz_agent_upsert_unique_indexes.sql +++ b/sql/core/zz_agent_upsert_unique_indexes.sql @@ -2,6 +2,9 @@ CREATE UNIQUE INDEX IF NOT EXISTS ux_core_agents_current ON core.agents (agent_id, agent_name) WHERE is_current = true; +CREATE UNIQUE INDEX IF NOT EXISTS ux_core_agents_internal_id +ON core.agents (agent_internal_id); + CREATE UNIQUE INDEX IF NOT EXISTS ux_core_agent_configurations_current ON core.agent_configurations (agent_internal_id) WHERE is_current = true; @@ -58,6 +61,15 @@ ON core.business_applications (business_application_id); CREATE UNIQUE INDEX IF NOT EXISTS ux_core_business_processes ON core.business_processes (business_process_id); +CREATE UNIQUE INDEX IF NOT EXISTS ux_core_columns +ON core.columns (table_id, name); + +CREATE UNIQUE INDEX IF NOT EXISTS ux_core_tables +ON core.tables (agent_internal_id, table_id); + +CREATE INDEX IF NOT EXISTS ix_core_tables_agent_tool +ON core.tables (agent_internal_id, tool_id); + DO $$ BEGIN IF to_regclass('core.agent_ai_use_cases') IS NOT NULL THEN @@ -208,6 +220,46 @@ BEGIN ON DELETE CASCADE; END IF; + -- Create helper function + triggers to populate tenant_id on insert when missing + IF to_regclass('core.agents') IS NOT NULL THEN + EXECUTE $ddl$ + CREATE OR REPLACE FUNCTION core.populate_tenant_from_agent() RETURNS trigger AS $func$ + BEGIN + -- For rows that include agent_internal_id, prefer that lookup + IF NEW.tenant_id IS NULL OR NEW.tenant_id = '' THEN + IF TG_TABLE_NAME = 'agent_tools' OR TG_TABLE_NAME = 'tables' THEN + IF NEW.agent_internal_id IS NOT NULL THEN + SELECT tenant_id INTO NEW.tenant_id FROM core.agents WHERE agent_internal_id = NEW.agent_internal_id LIMIT 1; + END IF; + ELSIF TG_TABLE_NAME = 'columns' THEN + -- columns only reference table_id; derive tenant from core.tables + IF NEW.table_id IS NOT NULL THEN + SELECT tenant_id INTO NEW.tenant_id FROM core.tables WHERE table_id = NEW.table_id LIMIT 1; + END IF; + END IF; + END IF; + RETURN NEW; + END; + $func$ LANGUAGE plpgsql; + $ddl$; + + -- Attach triggers to relevant tables + IF to_regclass('core.agent_tools') IS NOT NULL THEN + EXECUTE 'DROP TRIGGER IF EXISTS trg_populate_tenant_agent_tools ON core.agent_tools'; + EXECUTE 'CREATE TRIGGER trg_populate_tenant_agent_tools BEFORE INSERT OR UPDATE ON core.agent_tools FOR EACH ROW EXECUTE FUNCTION core.populate_tenant_from_agent()'; + END IF; + + IF to_regclass('core.tables') IS NOT NULL THEN + EXECUTE 'DROP TRIGGER IF EXISTS trg_populate_tenant_tables ON core.tables'; + EXECUTE 'CREATE TRIGGER trg_populate_tenant_tables BEFORE INSERT OR UPDATE ON core.tables FOR EACH ROW EXECUTE FUNCTION core.populate_tenant_from_agent()'; + END IF; + + IF to_regclass('core.columns') IS NOT NULL THEN + EXECUTE 'DROP TRIGGER IF EXISTS trg_populate_tenant_columns ON core.columns'; + EXECUTE 'CREATE TRIGGER trg_populate_tenant_columns BEFORE INSERT OR UPDATE ON core.columns FOR EACH ROW EXECUTE FUNCTION core.populate_tenant_from_agent()'; + END IF; + END IF; + IF NOT EXISTS ( SELECT 1 FROM pg_constraint @@ -251,4 +303,49 @@ BEGIN END IF; END IF; + IF to_regclass('core.tables') IS NOT NULL + AND to_regclass('core.columns') IS NOT NULL + AND NOT EXISTS ( + SELECT 1 + FROM pg_constraint + WHERE conname = 'fk_core_columns_table' + ) + THEN + ALTER TABLE core.columns + ADD CONSTRAINT fk_core_columns_table + FOREIGN KEY (table_id) + REFERENCES core.tables (table_id) + ON DELETE CASCADE; + END IF; + + IF to_regclass('core.agents') IS NOT NULL + AND to_regclass('core.tables') IS NOT NULL + AND NOT EXISTS ( + SELECT 1 + FROM pg_constraint + WHERE conname = 'fk_core_tables_agent' + ) + THEN + ALTER TABLE core.tables + ADD CONSTRAINT fk_core_tables_agent + FOREIGN KEY (agent_internal_id) + REFERENCES core.agents (agent_internal_id) + ON DELETE CASCADE; + END IF; + + IF to_regclass('core.agent_tools') IS NOT NULL + AND to_regclass('core.tables') IS NOT NULL + AND NOT EXISTS ( + SELECT 1 + FROM pg_constraint + WHERE conname = 'fk_core_tables_agent_tool' + ) + THEN + ALTER TABLE core.tables + ADD CONSTRAINT fk_core_tables_agent_tool + FOREIGN KEY (agent_internal_id, tool_id) + REFERENCES core.agent_tools (agent_internal_id, tool_id) + ON DELETE CASCADE; + END IF; + END $$; diff --git a/tavro_api/api/routers/agents.py b/tavro_api/api/routers/agents.py index 6f4f956..9cd4a4f 100644 --- a/tavro_api/api/routers/agents.py +++ b/tavro_api/api/routers/agents.py @@ -98,7 +98,9 @@ class AgentCreateRequest(BaseModel): role: Optional[str] = None environment: Optional[str] = None owner: Optional[str] = None - tools: Optional[List[Dict[str, str]]] = None + tools: Optional[List[Dict[str, Any]]] = None + tables: Optional[List[Dict[str, Any]]] = None + data_source: Optional[List[Dict[str, Any]]] = None knowledge_source: Optional[Dict[str, str]] = None @@ -184,14 +186,183 @@ def _agent_card_dir() -> Path: return Path(os.getenv("LOCAL_AGENT_CARD_DIR", "./agent_cards")) +def _clean_text(value: Any) -> Optional[str]: + if value is None: + return None + text_value = str(value).strip() + return text_value or None + + +def _column_names(raw_columns: Any) -> List[str]: + if not raw_columns: + return [] + if isinstance(raw_columns, str): + raw_columns = [raw_columns] + if not isinstance(raw_columns, list): + return [] + + names: List[str] = [] + seen: set[str] = set() + for col in raw_columns: + if isinstance(col, dict): + name = _clean_text(col.get("name") or col.get("column_name") or col.get("identifier")) + else: + name = _clean_text(col) + if name and name.lower() not in seen: + seen.add(name.lower()) + names.append(name) + return names + + +def _table_items(raw_tables: Any) -> List[Dict[str, Any]]: + if not raw_tables: + return [] + if isinstance(raw_tables, dict): + raw_tables = [raw_tables] + elif isinstance(raw_tables, str): + raw_tables = [{"name": raw_tables}] + if not isinstance(raw_tables, list): + return [] + + tables: List[Dict[str, Any]] = [] + for raw in raw_tables: + if isinstance(raw, str): + raw = {"name": raw} + if not isinstance(raw, dict): + continue + tables.append({ + "table_id": _clean_text(raw.get("table_id") or raw.get("id") or raw.get("identifier")), + "name": _clean_text(raw.get("name") or raw.get("table_name")), + "columns": _column_names(raw.get("columns") or raw.get("column")), + "tool_name": _clean_text(raw.get("tool_name") or raw.get("tool")), + "tool_id": _clean_text(raw.get("tool_id")), + }) + return tables + + +def _tables_from_tools(tools: Optional[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + tables: List[Dict[str, Any]] = [] + for tool in tools or []: + if not isinstance(tool, dict): + continue + tool_name = _clean_text(tool.get("name")) + tool_tables = _table_items(tool.get("tables") or tool.get("table")) + + # Also support the compact shape: + # { "name": "create_incident", "columns": ["id", "status"] } + if not tool_tables and tool.get("columns"): + tool_tables = [{ + "table_id": None, + "name": _clean_text(tool.get("table_name")) or (f"{tool_name} table" if tool_name else None), + "columns": _column_names(tool.get("columns")), + "tool_name": tool_name, + "tool_id": None, + }] + + for table in tool_tables: + table["tool_name"] = table.get("tool_name") or tool_name + tables.append(table) + return tables + + +def _tables_from_data_sources(data_sources: Optional[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + table_map: Dict[str, Dict[str, Any]] = {} + for entry in data_sources or []: + if not isinstance(entry, dict): + continue + src_type = str(entry.get("source_object_type") or "").lower() + tgt_type = str(entry.get("target_object_type") or "").lower() + if src_type == "table" and tgt_type == "column": + table_id = _clean_text(entry.get("source_object_id")) + if not table_id: + continue + item = table_map.setdefault( + table_id, + { + "table_id": table_id, + "name": _clean_text(entry.get("source_object_name")), + "columns": [], + "tool_name": None, + "tool_id": None, + }, + ) + column_name = _clean_text(entry.get("target_object_name") or entry.get("target_object_id")) + if column_name and column_name not in item["columns"]: + item["columns"].append(column_name) + elif src_type == "tool" and tgt_type == "table": + table_id = _clean_text(entry.get("target_object_id")) + if not table_id: + continue + item = table_map.setdefault( + table_id, + { + "table_id": table_id, + "name": _clean_text(entry.get("target_object_name")), + "columns": [], + "tool_name": None, + "tool_id": None, + }, + ) + item["tool_id"] = _clean_text(entry.get("source_object_id")) + item["tool_name"] = _clean_text(entry.get("source_object_name")) + item["name"] = item.get("name") or _clean_text(entry.get("target_object_name")) + return list(table_map.values()) + + +def _normalize_tables_payload( + tables: Any, + tools: Optional[List[Dict[str, Any]]], + data_sources: Optional[List[Dict[str, Any]]], +) -> List[Dict[str, Any]]: + normalized: Dict[str, Dict[str, Any]] = {} + for table in [ + *_table_items(tables), + *_tables_from_tools(tools), + *_tables_from_data_sources(data_sources), + ]: + raw_table_id = table.get("table_id") + table_name = table.get("name") + if raw_table_id: + key = f"id:{raw_table_id}" + elif table_name: + key = f"name:{str(table_name).strip().lower()}" + else: + key = f"anonymous:{len(normalized)}" + item = normalized.setdefault( + key, + { + "table_id": raw_table_id, + "name": table_name, + "columns": [], + "tool_name": table.get("tool_name"), + "tool_id": table.get("tool_id"), + }, + ) + item["table_id"] = item.get("table_id") or raw_table_id + item["name"] = table_name or item.get("name") + item["tool_name"] = table.get("tool_name") or item.get("tool_name") + item["tool_id"] = table.get("tool_id") or item.get("tool_id") + existing_columns = {str(col).strip().lower() for col in item["columns"]} + for column_name in table.get("columns") or []: + column_key = str(column_name).strip().lower() + if column_key and column_key not in existing_columns: + item["columns"].append(column_name) + existing_columns.add(column_key) + + for item in normalized.values(): + item["table_id"] = item.get("table_id") or str(uuid.uuid4()) + return list(normalized.values()) + + def _write_agent_card( agent_id: str, agent_internal_id: str, agent_name: str, description: str, instruction: str, - tools: Optional[List[Dict[str, str]]] = None, + tools: Optional[List[Dict[str, Any]]] = None, knowledge_source: Optional[Dict[str, str]] = None, + tables: Optional[List[Dict[str, Any]]] = None, ) -> None: """Write a full agent card JSON file immediately after creation so get_agent_card returns complete details.""" try: @@ -202,7 +373,7 @@ def _write_agent_card( data_source_entries = [] if tools: for tool in tools: - tool_id = str(uuid.uuid4()) + tool_id = tool.get("identifier") or str(uuid.uuid4()) tool_entries.append({ "identifier": tool_id, "name": tool.get("name"), @@ -232,6 +403,45 @@ def _write_agent_card( "uses_pci": None, }) + for table in tables or []: + table_id = table.get("table_id") + table_name = table.get("name") + if not table_id: + continue + data_source_entries.append({ + "relationship_id": None, + "parent_relationship_id": None, + "source_object_id": table.get("tool_id") or agent_id, + "source_object_domain": None, + "source_object_name": table.get("tool_name") or agent_name, + "source_object_type": "Tool" if table.get("tool_id") else "Agent", + "target_object_id": table_id, + "target_object_domain": None, + "target_object_name": table_name, + "target_object_type": "Table", + "access_level": None, + "uses_pii": None, + "uses_phi": None, + "uses_pci": None, + }) + for column_name in table.get("columns") or []: + data_source_entries.append({ + "relationship_id": None, + "parent_relationship_id": None, + "source_object_id": table_id, + "source_object_domain": None, + "source_object_name": table_name, + "source_object_type": "Table", + "target_object_id": column_name, + "target_object_domain": None, + "target_object_name": column_name, + "target_object_type": "Column", + "access_level": None, + "uses_pii": None, + "uses_phi": None, + "uses_pci": None, + }) + ks_entry = None if knowledge_source: ks_entry = { @@ -352,8 +562,15 @@ async def create_agent( "environment": body.environment or None}, ) + tool_name_to_id: Dict[str, str] = {} + tools_for_card: List[Dict[str, Any]] = [] for tool in (body.tools or []): tool_id = str(uuid.uuid4()) + tool_name = tool.get("name", "") + tool_name_key = str(tool_name).strip().lower() + if tool_name_key: + tool_name_to_id[tool_name_key] = tool_id + tools_for_card.append({**tool, "identifier": tool_id}) await db.execute( text(f""" INSERT INTO {CORE}.agent_tools @@ -364,9 +581,118 @@ async def create_agent( :tname, :tdesc, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) """), {"tid": tenant_id, "iid": agent_internal_id, "tool_id": tool_id, - "aid": agent_id, "tname": tool.get("name", ""), "tdesc": tool.get("description", "")}, + "aid": agent_id, "tname": tool_name, "tdesc": tool.get("description", "")}, ) + tables_payload = _normalize_tables_payload(body.tables, body.tools, body.data_source) + for table in tables_payload: + tool_name_key = str(table.get("tool_name") or "").strip().lower() + if tool_name_key and not table.get("tool_id"): + table["tool_id"] = tool_name_to_id.get(tool_name_key) + + table_id = table.get("table_id") or str(uuid.uuid4()) + table["table_id"] = table_id + table_name = table.get("name") + table_tool_id = table.get("tool_id") + table_agent_id = None if table_tool_id else agent_id + + await db.execute( + text(f""" + INSERT INTO {CORE}.tables + (tenant_id, agent_internal_id, agent_id, tool_id, table_id, name, created_ts, updated_ts) + VALUES + (:tid, :iid, :table_agent_id, :tool_id, :table_id, :name, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (agent_internal_id, table_id) + DO UPDATE SET + agent_id = EXCLUDED.agent_id, + tool_id = EXCLUDED.tool_id, + name = COALESCE(EXCLUDED.name, {CORE}.tables.name), + updated_ts = EXCLUDED.updated_ts + """), + { + "tid": tenant_id, + "iid": agent_internal_id, + "table_agent_id": table_agent_id, + "tool_id": table_tool_id, + "table_id": table_id, + "name": table_name, + }, + ) + + if table_tool_id: + await db.execute( + text(f""" + INSERT INTO {CORE}.agent_data_sources ( + tenant_id, agent_internal_id, agent_id, + created_ts, updated_ts, + source_object_id, source_object_name, source_object_type, + target_object_id, target_object_name, target_object_type + ) + VALUES ( + :tid, :iid, :aid, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, + :tool_id, :tool_name, 'Tool', + :table_id, :table_name, 'Table' + ) + ON CONFLICT (agent_internal_id, source_object_id, target_object_id) + DO UPDATE SET + updated_ts = EXCLUDED.updated_ts, + source_object_name = EXCLUDED.source_object_name, + target_object_name = EXCLUDED.target_object_name + """), + { + "tid": tenant_id, + "iid": agent_internal_id, + "aid": agent_id, + "tool_id": table_tool_id, + "tool_name": table.get("tool_name"), + "table_id": table_id, + "table_name": table_name, + }, + ) + + for column_name in table.get("columns") or []: + await db.execute( + text(f""" + INSERT INTO {CORE}.columns (tenant_id, table_id, name, created_ts, updated_ts) + VALUES (:tid, :table_id, :col_name, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (table_id, name) + DO UPDATE SET + tenant_id = EXCLUDED.tenant_id, + updated_ts = EXCLUDED.updated_ts + """), + {"tid": tenant_id, "table_id": table_id, "col_name": column_name}, + ) + await db.execute( + text(f""" + INSERT INTO {CORE}.agent_data_sources ( + tenant_id, agent_internal_id, agent_id, + created_ts, updated_ts, + source_object_id, source_object_name, source_object_type, + target_object_id, target_object_name, target_object_type + ) + VALUES ( + :tid, :iid, :aid, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, + :table_id, :table_name, 'Table', + :column_name, :column_name, 'Column' + ) + ON CONFLICT (agent_internal_id, source_object_id, target_object_id) + DO UPDATE SET + updated_ts = EXCLUDED.updated_ts, + source_object_name = EXCLUDED.source_object_name, + target_object_name = EXCLUDED.target_object_name + """), + { + "tid": tenant_id, + "iid": agent_internal_id, + "aid": agent_id, + "table_id": table_id, + "table_name": table_name, + "column_name": column_name, + }, + ) + if body.knowledge_source: await db.execute( text(f""" @@ -415,8 +741,9 @@ async def create_agent( agent_name=body.agent_name, description=body.description, instruction=body.instruction, - tools=body.tools, + tools=tools_for_card, knowledge_source=body.knowledge_source, + tables=tables_payload, ) background_tasks.add_task( diff --git a/tavro_app/src/services/agentApi.ts b/tavro_app/src/services/agentApi.ts index 5a6a708..f27bce2 100644 --- a/tavro_app/src/services/agentApi.ts +++ b/tavro_app/src/services/agentApi.ts @@ -47,7 +47,9 @@ export interface AgentCreatePayload { role?: string; environment?: string; owner?: string; - tools?: Array<{ name: string; description: string }>; + tools?: Array<{ name: string; description: string; table?: any; tables?: any[]; columns?: any[] }>; + tables?: Array<{ table_id?: string; name?: string; table_name?: string; columns?: any[]; tool_name?: string; tool_id?: string }>; + data_source?: Array>; knowledge_source?: { name: string; description: string }; } diff --git a/tavro_app/src/services/mcpClient.ts b/tavro_app/src/services/mcpClient.ts index de07090..35ae0c6 100644 --- a/tavro_app/src/services/mcpClient.ts +++ b/tavro_app/src/services/mcpClient.ts @@ -1184,6 +1184,8 @@ Every generated value must be coherent with the blueprint. Do not fabricate data description, instruction, ...(args?.tools ? { tools: args.tools } : {}), + ...(args?.tables ? { tables: args.tables } : {}), + ...(args?.data_source ? { data_source: args.data_source } : {}), ...(args?.knowledge_source ? { knowledge_source: args.knowledge_source } : {}), ...(args?.original_prompt ? { original_prompt: args.original_prompt } : {}), }; diff --git a/tavro_library/agent_library.py b/tavro_library/agent_library.py index a39ba9e..8ac80ef 100644 --- a/tavro_library/agent_library.py +++ b/tavro_library/agent_library.py @@ -525,6 +525,161 @@ def _send(): @staticmethod def sanitize(val: str) -> str: return val.replace("'", "''") if val else val + + @staticmethod + def _clean_text(value: Optional[Any]) -> Optional[str]: + if value is None: + return None + text_value = str(value).strip() + return text_value or None + + @classmethod + def _column_names(cls, raw_columns: Any) -> List[str]: + if not raw_columns: + return [] + if isinstance(raw_columns, str): + raw_columns = [raw_columns] + if not isinstance(raw_columns, list): + return [] + + names: List[str] = [] + seen = set() + for col in raw_columns: + if isinstance(col, dict): + name = cls._clean_text(col.get("name") or col.get("column_name") or col.get("identifier")) + else: + name = cls._clean_text(col) + if name and name.lower() not in seen: + seen.add(name.lower()) + names.append(name) + return names + + @classmethod + def _table_items(cls, raw_tables: Any) -> List[Dict[str, Any]]: + if not raw_tables: + return [] + if isinstance(raw_tables, dict): + raw_tables = [raw_tables] + elif isinstance(raw_tables, str): + raw_tables = [{"name": raw_tables}] + if not isinstance(raw_tables, list): + return [] + + tables: List[Dict[str, Any]] = [] + for raw in raw_tables: + if isinstance(raw, str): + raw = {"name": raw} + if not isinstance(raw, dict): + continue + tables.append({ + "table_id": cls._clean_text(raw.get("table_id") or raw.get("id") or raw.get("identifier")), + "name": cls._clean_text(raw.get("name") or raw.get("table_name")), + "columns": cls._column_names(raw.get("columns") or raw.get("column")), + "tool_name": cls._clean_text(raw.get("tool_name") or raw.get("tool")), + "tool_id": cls._clean_text(raw.get("tool_id")), + }) + return tables + + @classmethod + def _tables_from_tools(cls, tools: Optional[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + tables: List[Dict[str, Any]] = [] + for tool in tools or []: + if not isinstance(tool, dict): + continue + tool_name = cls._clean_text(tool.get("name")) + tool_tables = cls._table_items(tool.get("tables") or tool.get("table")) + + if not tool_tables and tool.get("columns"): + tool_tables = [{ + "table_id": None, + "name": cls._clean_text(tool.get("table_name")) or (f"{tool_name} table" if tool_name else None), + "columns": cls._column_names(tool.get("columns")), + "tool_name": tool_name, + "tool_id": None, + }] + + for table in tool_tables: + table["tool_name"] = table.get("tool_name") or tool_name + tables.append(table) + return tables + + @classmethod + def _tables_from_data_sources(cls, data_sources: Optional[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + table_map: Dict[str, Dict[str, Any]] = {} + for entry in data_sources or []: + if not isinstance(entry, dict): + continue + src_type = str(entry.get("source_object_type") or "").lower() + tgt_type = str(entry.get("target_object_type") or "").lower() + if src_type == "table" and tgt_type == "column": + table_id = cls._clean_text(entry.get("source_object_id")) + if not table_id: + continue + item = table_map.setdefault( + table_id, + {"table_id": table_id, "name": cls._clean_text(entry.get("source_object_name")), "columns": [], "tool_name": None, "tool_id": None}, + ) + column_name = cls._clean_text(entry.get("target_object_name") or entry.get("target_object_id")) + if column_name and column_name not in item["columns"]: + item["columns"].append(column_name) + elif src_type == "tool" and tgt_type == "table": + table_id = cls._clean_text(entry.get("target_object_id")) + if not table_id: + continue + item = table_map.setdefault( + table_id, + {"table_id": table_id, "name": cls._clean_text(entry.get("target_object_name")), "columns": [], "tool_name": None, "tool_id": None}, + ) + item["tool_id"] = cls._clean_text(entry.get("source_object_id")) + item["tool_name"] = cls._clean_text(entry.get("source_object_name")) + item["name"] = item.get("name") or cls._clean_text(entry.get("target_object_name")) + return list(table_map.values()) + + @classmethod + def _normalize_tables_payload( + cls, + tables: Any, + tools: Optional[List[Dict[str, Any]]], + data_sources: Optional[List[Dict[str, Any]]], + ) -> List[Dict[str, Any]]: + normalized: Dict[str, Dict[str, Any]] = {} + for table in [ + *cls._table_items(tables), + *cls._tables_from_tools(tools), + *cls._tables_from_data_sources(data_sources), + ]: + raw_table_id = table.get("table_id") + table_name = table.get("name") + if raw_table_id: + key = f"id:{raw_table_id}" + elif table_name: + key = f"name:{str(table_name).strip().lower()}" + else: + key = f"anonymous:{len(normalized)}" + item = normalized.setdefault( + key, + { + "table_id": raw_table_id, + "name": table_name, + "columns": [], + "tool_name": table.get("tool_name"), + "tool_id": table.get("tool_id"), + }, + ) + item["table_id"] = item.get("table_id") or raw_table_id + item["name"] = table_name or item.get("name") + item["tool_name"] = table.get("tool_name") or item.get("tool_name") + item["tool_id"] = table.get("tool_id") or item.get("tool_id") + existing_columns = {str(col).strip().lower() for col in item["columns"]} + for column_name in table.get("columns") or []: + column_key = str(column_name).strip().lower() + if column_key and column_key not in existing_columns: + item["columns"].append(column_name) + existing_columns.add(column_key) + + for item in normalized.values(): + item["table_id"] = item.get("table_id") or str(uuid.uuid4()) + return list(normalized.values()) @staticmethod def _normalize_tenant_id(value: Optional[Any]) -> Optional[str]: @@ -655,9 +810,10 @@ def _write_agent_card( agent_name: str, description: str, instruction: str, - tools: Optional[List[Dict[str, str]]] = None, + tools: Optional[List[Dict[str, Any]]] = None, knowledge_source: Optional[Dict[str, str]] = None, tool_ids: Optional[List[str]] = None, + tables: Optional[List[Dict[str, Any]]] = None, ) -> None: """Write a full agent card JSON file immediately after creation so get_agent_card returns complete details.""" try: @@ -697,6 +853,45 @@ def _write_agent_card( "uses_pci": None, }) + for table in tables or []: + table_id = table.get("table_id") + table_name = table.get("name") + if not table_id: + continue + data_source_entries.append({ + "relationship_id": None, + "parent_relationship_id": None, + "source_object_id": table.get("tool_id") or agent_id, + "source_object_domain": None, + "source_object_name": table.get("tool_name") or agent_name, + "source_object_type": "Tool" if table.get("tool_id") else "Agent", + "target_object_id": table_id, + "target_object_domain": None, + "target_object_name": table_name, + "target_object_type": "Table", + "access_level": None, + "uses_pii": None, + "uses_phi": None, + "uses_pci": None, + }) + for column_name in table.get("columns") or []: + data_source_entries.append({ + "relationship_id": None, + "parent_relationship_id": None, + "source_object_id": table_id, + "source_object_domain": None, + "source_object_name": table_name, + "source_object_type": "Table", + "target_object_id": column_name, + "target_object_domain": None, + "target_object_name": column_name, + "target_object_type": "Column", + "access_level": None, + "uses_pii": None, + "uses_phi": None, + "uses_pci": None, + }) + ks_entry = None if knowledge_source: ks_entry = { @@ -777,7 +972,9 @@ def create_agent( agent_name: str, description: str, instruction: str, - tools: Optional[List[Dict[str, str]]] = None, + tools: Optional[List[Dict[str, Any]]] = None, + tables: Optional[List[Dict[str, Any]]] = None, + data_source: Optional[List[Dict[str, Any]]] = None, knowledge_source: Optional[Dict[str, str]] = None, tenant_id: Optional[str] = None )-> Dict[str, Any]: @@ -798,7 +995,11 @@ def create_agent( queries = [] data_source_values = [] + table_values = [] + column_values = [] tool_ids_for_card: List[str] = [] + tool_name_to_id: Dict[str, str] = {} + tables_payload = cls._normalize_tables_payload(tables, tools, data_source) # 1. agents table tenant_id_value = f"'{tenant_id}'," if tenant_id else "" @@ -856,6 +1057,8 @@ def create_agent( tool_ids_for_card.append(tool_id) name = cls.sanitize(tool.get("name")) desc = cls.sanitize(tool.get("description")) + if name: + tool_name_to_id[str(tool.get("name")).strip().lower()] = tool_id values_list.append(f""" ( @@ -902,6 +1105,116 @@ def create_agent( """ queries.append(tools_query) + for table in tables_payload: + tool_name_key = str(table.get("tool_name") or "").strip().lower() + if tool_name_key and not table.get("tool_id"): + table["tool_id"] = tool_name_to_id.get(tool_name_key) + + table_id = cls.sanitize(table.get("table_id") or str(uuid.uuid4())) + table["table_id"] = table_id + table_name = cls.sanitize(table.get("name") or "") + table_tool_id = cls.sanitize(table.get("tool_id") or "") + table_tool_name = cls.sanitize(table.get("tool_name") or "") + table_agent_id_value = "NULL" if table_tool_id else f"'{agent_id}'" + table_tool_id_value = f"'{table_tool_id}'" if table_tool_id else "NULL" + + table_values.append(f""" + ( + {tenant_id_value} + '{agent_internal_id}', + {table_agent_id_value}, + {table_tool_id_value}, + '{table_id}', + '{table_name}', + TIMESTAMP '{now}', + TIMESTAMP '{now}' + ) + """) + + if table_tool_id: + data_source_values.append(f""" + ( + {tenant_id_value} + '{agent_internal_id}', + '{agent_id}', + TIMESTAMP '{now}', + TIMESTAMP '{now}', + '{table_tool_id}', + '{table_tool_name}', + 'Tool', + '{table_id}', + '{table_name}', + 'Table' + ) + """) + + for column_name in table.get("columns") or []: + clean_column = cls.sanitize(column_name) + if not clean_column: + continue + column_values.append(f""" + ( + {tenant_id_value} + '{table_id}', + '{clean_column}', + TIMESTAMP '{now}', + TIMESTAMP '{now}' + ) + """) + data_source_values.append(f""" + ( + {tenant_id_value} + '{agent_internal_id}', + '{agent_id}', + TIMESTAMP '{now}', + TIMESTAMP '{now}', + '{table_id}', + '{table_name}', + 'Table', + '{clean_column}', + '{clean_column}', + 'Column' + ) + """) + + if table_values: + queries.append(f""" + INSERT INTO {cls.CORE_DB_NAME}.tables ( + {tenant_id_column} + agent_internal_id, + agent_id, + tool_id, + table_id, + name, + created_ts, + updated_ts + ) + VALUES + {",".join(table_values)} + ON CONFLICT (agent_internal_id, table_id) + DO UPDATE SET + agent_id = EXCLUDED.agent_id, + tool_id = EXCLUDED.tool_id, + name = COALESCE(EXCLUDED.name, {cls.CORE_DB_NAME}.tables.name), + updated_ts = EXCLUDED.updated_ts + """) + + if column_values: + queries.append(f""" + INSERT INTO {cls.CORE_DB_NAME}.columns ( + {tenant_id_column} + table_id, + name, + created_ts, + updated_ts + ) + VALUES + {",".join(column_values)} + ON CONFLICT (table_id, name) + DO UPDATE SET + updated_ts = EXCLUDED.updated_ts + """) + # 4. knowledge sources (ONLY name + description) if knowledge_source: ks_name = cls.sanitize(knowledge_source.get("name")) @@ -961,6 +1274,7 @@ def create_agent( tools=tools, knowledge_source=knowledge_source, tool_ids=tool_ids_for_card, + tables=tables_payload, ) payload = { From 332f6ecf352a2fba40249b590dadf465528a801d Mon Sep 17 00:00:00 2001 From: prasannakumar-tavro Date: Thu, 4 Jun 2026 17:55:21 +0530 Subject: [PATCH 2/7] Added the relation tables for the agent to table, table to columns and tool to tables --- services/upload_processor.py | 63 ++++++++++++++- sql/core/agent_tables.sql | 10 +++ sql/core/table_columns.sql | 8 ++ sql/core/tool_tables.sql | 11 +++ sql/core/zz_agent_upsert_unique_indexes.sql | 9 +++ tavro_api/api/routers/agents.py | 57 +++++++++++++ tavro_library/agent_library.py | 88 +++++++++++++++++++++ 7 files changed, 245 insertions(+), 1 deletion(-) create mode 100644 sql/core/agent_tables.sql create mode 100644 sql/core/table_columns.sql create mode 100644 sql/core/tool_tables.sql diff --git a/services/upload_processor.py b/services/upload_processor.py index 3c60bb9..a74269b 100644 --- a/services/upload_processor.py +++ b/services/upload_processor.py @@ -371,7 +371,7 @@ def _upsert_tables(conn, card: dict, agent_internal_id: str, now_str: str): if str(src_type).lower() == "agent": tables[tid]["agent_id"] = ds.get("source_object_id") - # Second pass: if a Tool -> Table relation exists, capture tool_id and drop agent_id + # Second pass: if a Tool -> Table relation exists, capture tool_id/tool_name and drop agent_id for ds in data_sources: src_type = ds.get("source_object_type") tgt_type = ds.get("target_object_type") @@ -380,17 +380,25 @@ def _upsert_tables(conn, card: dict, agent_internal_id: str, now_str: str): table_id = ds.get("target_object_id") if table_id and tool_id and table_id in tables: tables[table_id]["tool_id"] = tool_id + tables[table_id]["tool_name"] = ds.get("source_object_name") or "" tables[table_id].pop("agent_id", None) if not tables: return + agent_name = card.get("name") or "" select_rows = [] + agent_table_rows = [] + tool_table_rows = [] for tid, meta in tables.items(): name = meta.get("name") tool_id = meta.get("tool_id") + tool_name = meta.get("tool_name") or "" table_agent_id = meta.get("agent_id") select_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(agent_internal_id)} AS agent_internal_id, {_sq(table_agent_id)} AS agent_id, {_sq(tool_id)} AS tool_id, {_sq(tid)} AS table_id, {_sq(name)} AS name, TIMESTAMP '{now_str}' AS now_ts") + agent_table_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(agent_id)} AS agent_id, {_sq(agent_name)} AS agent_name, {_sq(agent_internal_id)} AS agent_internal_id, {_sq(tid)} AS table_id, {_sq(name)} AS table_name, TIMESTAMP '{now_str}' AS now_ts") + if tool_id: + tool_table_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(tool_id)} AS tool_id, {_sq(tool_name)} AS tool_name, {_sq(tid)} AS table_id, {_sq(name)} AS table_name, {_sq(agent_id)} AS agent_id, {_sq(agent_internal_id)} AS agent_internal_id, TIMESTAMP '{now_str}' AS now_ts") union_all = "\nUNION ALL\n".join(select_rows) _exec(conn, f""" @@ -410,6 +418,43 @@ def _upsert_tables(conn, card: dict, agent_internal_id: str, now_str: str): updated_ts = EXCLUDED.updated_ts """, f"tables upsert ({len(select_rows)})") + if agent_table_rows: + ua = "\nUNION ALL\n".join(agent_table_rows) + _exec(conn, f""" + INSERT INTO {CORE}.agent_tables ( + tenant_id, agent_id, agent_name, agent_internal_id, + table_id, table_name, created_ts, updated_ts + ) + SELECT tenant_id, agent_id, agent_name, agent_internal_id, + table_id, table_name, now_ts, now_ts + FROM ({ua}) AS s + WHERE agent_id IS NOT NULL AND agent_id <> '' + ON CONFLICT (tenant_id, agent_id, table_id) DO UPDATE SET + agent_name = EXCLUDED.agent_name, + agent_internal_id = EXCLUDED.agent_internal_id, + table_name = COALESCE(EXCLUDED.table_name, {CORE}.agent_tables.table_name), + updated_ts = EXCLUDED.updated_ts + """, f"agent_tables upsert ({len(agent_table_rows)})") + + if tool_table_rows: + ut = "\nUNION ALL\n".join(tool_table_rows) + _exec(conn, f""" + INSERT INTO {CORE}.tool_tables ( + tenant_id, tool_id, tool_name, table_id, table_name, + agent_id, agent_internal_id, created_ts, updated_ts + ) + SELECT tenant_id, tool_id, tool_name, table_id, table_name, + agent_id, agent_internal_id, now_ts, now_ts + FROM ({ut}) AS s + WHERE tool_id IS NOT NULL AND tool_id <> '' + ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET + tool_name = COALESCE(EXCLUDED.tool_name, {CORE}.tool_tables.tool_name), + table_name = COALESCE(EXCLUDED.table_name, {CORE}.tool_tables.table_name), + agent_id = EXCLUDED.agent_id, + agent_internal_id = EXCLUDED.agent_internal_id, + updated_ts = EXCLUDED.updated_ts + """, f"tool_tables upsert ({len(tool_table_rows)})") + # --------------------------------------------------------------------------- # Step Y — core.columns (tables -> columns) @@ -428,12 +473,15 @@ def _upsert_columns(conn, card: dict, agent_internal_id: str, now_str: str): # Columns are represented as relationships where source is Table and target is Column select_rows = [] + tc_rows = [] for ds in data_sources: if str(ds.get("source_object_type")).lower() == "table" and str(ds.get("target_object_type")).lower() == "column": table_id = ds.get("source_object_id") + table_name = ds.get("source_object_name") or "" col_name = ds.get("target_object_name") if table_id and col_name: select_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(table_id)} AS table_id, {_sq(col_name)} AS name, TIMESTAMP '{now_str}' AS now_ts") + tc_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(table_id)} AS table_id, {_sq(table_name)} AS table_name, {_sq(col_name)} AS column_name, TIMESTAMP '{now_str}' AS now_ts") if not select_rows: return @@ -450,6 +498,19 @@ def _upsert_columns(conn, card: dict, agent_internal_id: str, now_str: str): updated_ts = EXCLUDED.updated_ts """, f"columns upsert ({len(select_rows)})") + if tc_rows: + utc = "\nUNION ALL\n".join(tc_rows) + _exec(conn, f""" + INSERT INTO {CORE}.table_columns ( + tenant_id, table_id, table_name, column_name, created_ts, updated_ts + ) + SELECT tenant_id, table_id, table_name, column_name, now_ts, now_ts + FROM ({utc}) AS s + ON CONFLICT (tenant_id, table_id, column_name) DO UPDATE SET + table_name = COALESCE(EXCLUDED.table_name, {CORE}.table_columns.table_name), + updated_ts = EXCLUDED.updated_ts + """, f"table_columns upsert ({len(tc_rows)})") + # --------------------------------------------------------------------------- # Step 5 — core.agent_controls diff --git a/sql/core/agent_tables.sql b/sql/core/agent_tables.sql new file mode 100644 index 0000000..3af0991 --- /dev/null +++ b/sql/core/agent_tables.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS core.agent_tables ( + tenant_id TEXT, + agent_id TEXT, + agent_name TEXT, + agent_internal_id TEXT, + table_id TEXT, + table_name TEXT, + created_ts TIMESTAMP, + updated_ts TIMESTAMP +); diff --git a/sql/core/table_columns.sql b/sql/core/table_columns.sql new file mode 100644 index 0000000..ff563d1 --- /dev/null +++ b/sql/core/table_columns.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS core.table_columns ( + tenant_id TEXT, + table_id TEXT, + table_name TEXT, + column_name TEXT, + created_ts TIMESTAMP, + updated_ts TIMESTAMP +); diff --git a/sql/core/tool_tables.sql b/sql/core/tool_tables.sql new file mode 100644 index 0000000..7107412 --- /dev/null +++ b/sql/core/tool_tables.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS core.tool_tables ( + tenant_id TEXT, + tool_id TEXT, + tool_name TEXT, + table_id TEXT, + table_name TEXT, + agent_id TEXT, + agent_internal_id TEXT, + created_ts TIMESTAMP, + updated_ts TIMESTAMP +); diff --git a/sql/core/zz_agent_upsert_unique_indexes.sql b/sql/core/zz_agent_upsert_unique_indexes.sql index 09f2ea0..4ef0ad2 100644 --- a/sql/core/zz_agent_upsert_unique_indexes.sql +++ b/sql/core/zz_agent_upsert_unique_indexes.sql @@ -70,6 +70,15 @@ ON core.tables (agent_internal_id, table_id); CREATE INDEX IF NOT EXISTS ix_core_tables_agent_tool ON core.tables (agent_internal_id, tool_id); +CREATE UNIQUE INDEX IF NOT EXISTS ux_core_tool_tables +ON core.tool_tables (tenant_id, tool_id, table_id); + +CREATE UNIQUE INDEX IF NOT EXISTS ux_core_agent_tables +ON core.agent_tables (tenant_id, agent_id, table_id); + +CREATE UNIQUE INDEX IF NOT EXISTS ux_core_table_columns +ON core.table_columns (tenant_id, table_id, column_name); + DO $$ BEGIN IF to_regclass('core.agent_ai_use_cases') IS NOT NULL THEN diff --git a/tavro_api/api/routers/agents.py b/tavro_api/api/routers/agents.py index 9cd4a4f..c5e4897 100644 --- a/tavro_api/api/routers/agents.py +++ b/tavro_api/api/routers/agents.py @@ -619,6 +619,24 @@ async def create_agent( }, ) + await db.execute( + text(f""" + INSERT INTO {CORE}.agent_tables + (tenant_id, agent_id, agent_name, agent_internal_id, + table_id, table_name, created_ts, updated_ts) + VALUES + (:tid, :aid, :aname, :iid, :table_id, :table_name, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (tenant_id, agent_id, table_id) DO UPDATE SET + agent_name = EXCLUDED.agent_name, + agent_internal_id = EXCLUDED.agent_internal_id, + table_name = COALESCE(EXCLUDED.table_name, {CORE}.agent_tables.table_name), + updated_ts = EXCLUDED.updated_ts + """), + {"tid": tenant_id, "aid": agent_id, "aname": body.agent_name, + "iid": agent_internal_id, "table_id": table_id, "table_name": table_name}, + ) + if table_tool_id: await db.execute( text(f""" @@ -650,6 +668,31 @@ async def create_agent( "table_name": table_name, }, ) + await db.execute( + text(f""" + INSERT INTO {CORE}.tool_tables + (tenant_id, tool_id, tool_name, table_id, table_name, + agent_id, agent_internal_id, created_ts, updated_ts) + VALUES + (:tid, :tool_id, :tool_name, :table_id, :table_name, + :aid, :iid, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET + tool_name = COALESCE(EXCLUDED.tool_name, {CORE}.tool_tables.tool_name), + table_name = COALESCE(EXCLUDED.table_name, {CORE}.tool_tables.table_name), + agent_id = EXCLUDED.agent_id, + agent_internal_id = EXCLUDED.agent_internal_id, + updated_ts = EXCLUDED.updated_ts + """), + { + "tid": tenant_id, + "tool_id": table_tool_id, + "tool_name": table.get("tool_name"), + "table_id": table_id, + "table_name": table_name, + "aid": agent_id, + "iid": agent_internal_id, + }, + ) for column_name in table.get("columns") or []: await db.execute( @@ -663,6 +706,20 @@ async def create_agent( """), {"tid": tenant_id, "table_id": table_id, "col_name": column_name}, ) + await db.execute( + text(f""" + INSERT INTO {CORE}.table_columns + (tenant_id, table_id, table_name, column_name, created_ts, updated_ts) + VALUES + (:tid, :table_id, :table_name, :column_name, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (tenant_id, table_id, column_name) DO UPDATE SET + table_name = COALESCE(EXCLUDED.table_name, {CORE}.table_columns.table_name), + updated_ts = EXCLUDED.updated_ts + """), + {"tid": tenant_id, "table_id": table_id, + "table_name": table_name, "column_name": column_name}, + ) await db.execute( text(f""" INSERT INTO {CORE}.agent_data_sources ( diff --git a/tavro_library/agent_library.py b/tavro_library/agent_library.py index 8ac80ef..8340e05 100644 --- a/tavro_library/agent_library.py +++ b/tavro_library/agent_library.py @@ -997,6 +997,9 @@ def create_agent( data_source_values = [] table_values = [] column_values = [] + agent_table_values = [] + tool_table_values = [] + table_column_values = [] tool_ids_for_card: List[str] = [] tool_name_to_id: Dict[str, str] = {} tables_payload = cls._normalize_tables_payload(tables, tools, data_source) @@ -1131,6 +1134,20 @@ def create_agent( ) """) + # agent_tables relationship + agent_table_values.append(f""" + ( + {tenant_id_value} + '{agent_id}', + '{agent_name}', + '{agent_internal_id}', + '{table_id}', + '{table_name}', + TIMESTAMP '{now}', + TIMESTAMP '{now}' + ) + """) + if table_tool_id: data_source_values.append(f""" ( @@ -1147,6 +1164,20 @@ def create_agent( 'Table' ) """) + # tool_tables relationship + tool_table_values.append(f""" + ( + {tenant_id_value} + '{table_tool_id}', + '{table_tool_name}', + '{table_id}', + '{table_name}', + '{agent_id}', + '{agent_internal_id}', + TIMESTAMP '{now}', + TIMESTAMP '{now}' + ) + """) for column_name in table.get("columns") or []: clean_column = cls.sanitize(column_name) @@ -1161,6 +1192,17 @@ def create_agent( TIMESTAMP '{now}' ) """) + # table_columns relationship + table_column_values.append(f""" + ( + {tenant_id_value} + '{table_id}', + '{table_name}', + '{clean_column}', + TIMESTAMP '{now}', + TIMESTAMP '{now}' + ) + """) data_source_values.append(f""" ( {tenant_id_value} @@ -1215,6 +1257,52 @@ def create_agent( updated_ts = EXCLUDED.updated_ts """) + if agent_table_values: + queries.append(f""" + INSERT INTO {cls.CORE_DB_NAME}.agent_tables ( + {tenant_id_column} + agent_id, agent_name, agent_internal_id, + table_id, table_name, created_ts, updated_ts + ) + VALUES + {",".join(agent_table_values)} + ON CONFLICT (tenant_id, agent_id, table_id) DO UPDATE SET + agent_name = EXCLUDED.agent_name, + agent_internal_id = EXCLUDED.agent_internal_id, + table_name = COALESCE(EXCLUDED.table_name, {cls.CORE_DB_NAME}.agent_tables.table_name), + updated_ts = EXCLUDED.updated_ts + """) + + if tool_table_values: + queries.append(f""" + INSERT INTO {cls.CORE_DB_NAME}.tool_tables ( + {tenant_id_column} + tool_id, tool_name, table_id, table_name, + agent_id, agent_internal_id, created_ts, updated_ts + ) + VALUES + {",".join(tool_table_values)} + ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET + tool_name = COALESCE(EXCLUDED.tool_name, {cls.CORE_DB_NAME}.tool_tables.tool_name), + table_name = COALESCE(EXCLUDED.table_name, {cls.CORE_DB_NAME}.tool_tables.table_name), + agent_id = EXCLUDED.agent_id, + agent_internal_id = EXCLUDED.agent_internal_id, + updated_ts = EXCLUDED.updated_ts + """) + + if table_column_values: + queries.append(f""" + INSERT INTO {cls.CORE_DB_NAME}.table_columns ( + {tenant_id_column} + table_id, table_name, column_name, created_ts, updated_ts + ) + VALUES + {",".join(table_column_values)} + ON CONFLICT (tenant_id, table_id, column_name) DO UPDATE SET + table_name = COALESCE(EXCLUDED.table_name, {cls.CORE_DB_NAME}.table_columns.table_name), + updated_ts = EXCLUDED.updated_ts + """) + # 4. knowledge sources (ONLY name + description) if knowledge_source: ks_name = cls.sanitize(knowledge_source.get("name")) From 82e3e5194c3f288af5d3144c6406fa618a96f0dc Mon Sep 17 00:00:00 2001 From: prasannakumar-tavro Date: Thu, 4 Jun 2026 18:48:46 +0530 Subject: [PATCH 3/7] fixed the update issues --- mcp_server/server.py | 15 ++++- tavro_library/agent_library.py | 109 ++++++++++++++++++++++++++++++++- 2 files changed, 119 insertions(+), 5 deletions(-) diff --git a/mcp_server/server.py b/mcp_server/server.py index db871c8..821e296 100644 --- a/mcp_server/server.py +++ b/mcp_server/server.py @@ -754,12 +754,12 @@ async def remove_ai_use_case_agent_relationship(original_prompt: str, *, agent_c return {"error": "INTERNAL_ERROR", "details": str(e)} @core.tool(name="update_agent") -async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, agent_name: Optional[str] = None, description: Optional[str] = None, instruction: Optional[str] = None, tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None) -> Dict[str, Any]: +async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, agent_name: Optional[str] = None, description: Optional[str] = None, instruction: Optional[str] = None, tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None, tables: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: """ Update an existing AI agent’s configuration. Allows modification of agent metadata such as name, description, - behavior instructions, tools, and knowledge sources. + behavior instructions, tools, knowledge sources, and tables. Args: original_prompt (str): REQUIRED. Exact user message verbatim. @@ -771,6 +771,15 @@ async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, confirmed to exist. Describe inter-agent dependencies generically if unknown. tools (Optional[List[Dict[str, str]]]): Updated tool list. knowledge_source (Optional[Dict[str, str]]): Updated knowledge source. + tables (Optional[List[Dict[str, Any]]]): Tables to rename or update. Each entry must include + the new name and a way to identify the existing table: + { + "name": str, # new table name to set + "old_name": str, # current table name (use when table_id is unknown) + "table_id": str # table identifier (preferred when available) + } + Use "old_name" when the user refers to the current name (e.g. "rename + SNOW_incident to Incidents"). Use "table_id" when you already know it. Returns: Dict[str, Any]: Updated agent metadata or error response. @@ -791,6 +800,7 @@ async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, "instruction": instruction, "tools": tools, "knowledge_source": knowledge_source, + "tables": tables, }, tenant_id, ) @@ -802,6 +812,7 @@ async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, instruction=instruction, tools=tools, knowledge_source=knowledge_source, + tables=tables, tenant_id=str(tenant_id), ) diff --git a/tavro_library/agent_library.py b/tavro_library/agent_library.py index 8340e05..627ab8b 100644 --- a/tavro_library/agent_library.py +++ b/tavro_library/agent_library.py @@ -394,6 +394,44 @@ def get_agent_card(cls, agent_name: Optional[str] = None, agent_id: Optional[str except Exception as ra_overlay_err: print(f"[get_agent_card] Risk assessment overlay failed (returning card as-is): {ra_overlay_err}") + # Overlay data_source from DB so renames and new relationships are + # immediately visible in the UI lineage without regenerating the card file. + try: + ds_rows = cls.execute_select( + f""" + SELECT relationship_id, parent_relationship_id, + source_object_id, source_object_domain, source_object_name, source_object_type, + target_object_id, target_object_domain, target_object_name, target_object_type, + access_level, contains_pii, contains_phi, contains_pci + FROM {cls.CORE_DB_NAME}.agent_data_sources + WHERE agent_id = %s + ORDER BY created_ts NULLS LAST + """, + (agent_id_clean,), + ) + if ds_rows: + local_card["data_source"] = [ + { + "relationship_id": r.get("relationship_id"), + "parent_relationship_id": r.get("parent_relationship_id"), + "source_object_id": r.get("source_object_id"), + "source_object_domain": r.get("source_object_domain"), + "source_object_name": r.get("source_object_name"), + "source_object_type": r.get("source_object_type"), + "target_object_id": r.get("target_object_id"), + "target_object_domain": r.get("target_object_domain"), + "target_object_name": r.get("target_object_name"), + "target_object_type": r.get("target_object_type"), + "access_level": r.get("access_level"), + "uses_pii": r.get("contains_pii"), + "uses_phi": r.get("contains_phi"), + "uses_pci": r.get("contains_pci"), + } + for r in ds_rows + ] + except Exception as ds_overlay_err: + print(f"[get_agent_card] Data source overlay failed (returning card as-is): {ds_overlay_err}") + return local_card # ---------- 7. Not found ---------- @@ -2095,6 +2133,7 @@ def update_agent( instruction: Optional[str] = None, tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None, + tables: Optional[List[Dict[str, Any]]] = None, tenant_id: Optional[str] = None ) -> Dict[str, Any]: """ @@ -2119,10 +2158,10 @@ def update_agent( agent_id = cls.sanitize(str(agent_id).strip()) # Fetch agent info (1 query) - rows = cls.execute_select(f"SELECT agent_internal_id FROM {cls.CORE_DB_NAME}.agents WHERE agent_id = '{agent_id}' AND is_current = true {tenant_where} LIMIT 1") + rows = cls.execute_select(f"SELECT agent_internal_id, agent_name FROM {cls.CORE_DB_NAME}.agents WHERE agent_id = '{agent_id}' AND is_current = true {tenant_where} LIMIT 1") if not rows: raise ValueError(f"Agent '{agent_id}' not found.") - + agent_internal_id = rows[0].get("agent_internal_id") now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') @@ -2152,7 +2191,71 @@ def update_agent( ks_desc = cls.sanitize(knowledge_source.get("description", "")) cls.execute_dml(f"INSERT INTO {cls.CORE_DB_NAME}.agent_knowledge_sources ({tenant_col}agent_internal_id, agent_id, name, description, created_ts, updated_ts) VALUES ({tenant_val}'{agent_internal_id}', '{agent_id}', '{ks_name}', '{ks_desc}', TIMESTAMP '{now}', TIMESTAMP '{now}')") - return {"message": "Agent updated successfully.", "agent_id": agent_id} + tables_updated = 0 + for table in (tables or []): + if not isinstance(table, dict): + continue + new_name = cls.sanitize(str(table.get("name") or "").strip()) + if not new_name: + continue + + # Resolve table_id: prefer explicit table_id, fall back to old_name or current name lookup + table_id = cls.sanitize(str(table.get("table_id") or "").strip()) + if not table_id: + lookup_name = cls.sanitize(str(table.get("old_name") or table.get("name") or "").strip()) + if lookup_name: + found = cls.execute_select( + f"SELECT table_id FROM {cls.CORE_DB_NAME}.tables " + f"WHERE agent_internal_id = '{agent_internal_id}' " + f"AND LOWER(name) = LOWER('{lookup_name}') LIMIT 1" + ) + if found: + table_id = cls.sanitize(str(found[0].get("table_id") or "").strip()) + + if not table_id: + continue + + # Update core.tables + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.tables SET name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE table_id = '{table_id}' AND agent_internal_id = '{agent_internal_id}'" + ) + # Propagate new name to all three relationship tables + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.agent_tables SET table_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE table_id = '{table_id}' AND agent_id = '{agent_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.tool_tables SET table_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE table_id = '{table_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.table_columns SET table_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE table_id = '{table_id}'" + ) + # Propagate new name to agent_data_sources (used by the UI lineage view) + # Table appears as a target in Tool→Table edges + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.agent_data_sources " + f"SET target_object_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE agent_internal_id = '{agent_internal_id}' " + f"AND target_object_id = '{table_id}' " + f"AND LOWER(target_object_type) = 'table'" + ) + # Table appears as a source in Table→Column edges + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.agent_data_sources " + f"SET source_object_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE agent_internal_id = '{agent_internal_id}' " + f"AND source_object_id = '{table_id}' " + f"AND LOWER(source_object_type) = 'table'" + ) + tables_updated += 1 + + msg = "Agent updated successfully." + if tables_updated: + msg += f" {tables_updated} table(s) renamed." + return {"message": msg, "agent_id": agent_id} @classmethod def delete_agent( From ef5bad4ebe083876da65df29a9949e5c0f328972 Mon Sep 17 00:00:00 2001 From: prasannakumar-tavro Date: Fri, 5 Jun 2026 14:38:31 +0530 Subject: [PATCH 4/7] updated the ddl's removed the relation table ids in the master tables, added the update functionality for the columns --- mcp_server/server.py | 15 +- services/upload_processor.py | 44 ++--- sql/core/columns.sql | 2 +- sql/core/table_columns.sql | 1 + sql/core/tables.sql | 3 - sql/core/tool_tables.sql | 16 +- sql/core/zz_agent_upsert_unique_indexes.sql | 181 ++++++++++++++------ tavro_api/api/routers/agents.py | 42 ++--- tavro_library/agent_library.py | 82 ++++++--- 9 files changed, 247 insertions(+), 139 deletions(-) diff --git a/mcp_server/server.py b/mcp_server/server.py index 821e296..71c1edd 100644 --- a/mcp_server/server.py +++ b/mcp_server/server.py @@ -754,12 +754,12 @@ async def remove_ai_use_case_agent_relationship(original_prompt: str, *, agent_c return {"error": "INTERNAL_ERROR", "details": str(e)} @core.tool(name="update_agent") -async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, agent_name: Optional[str] = None, description: Optional[str] = None, instruction: Optional[str] = None, tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None, tables: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: +async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, agent_name: Optional[str] = None, description: Optional[str] = None, instruction: Optional[str] = None, tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None, tables: Optional[List[Dict[str, Any]]] = None, columns: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: """ Update an existing AI agent’s configuration. Allows modification of agent metadata such as name, description, - behavior instructions, tools, knowledge sources, and tables. + behavior instructions, tools, knowledge sources, tables, and columns. Args: original_prompt (str): REQUIRED. Exact user message verbatim. @@ -780,6 +780,15 @@ async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, } Use "old_name" when the user refers to the current name (e.g. "rename SNOW_incident to Incidents"). Use "table_id" when you already know it. + columns (Optional[List[Dict[str, Any]]]): Columns to rename. Each entry must include + the new name and a way to identify the existing column: + { + "name": str, # new column name to set + "old_name": str, # current column name (required) + "table_id": str # table the column belongs to (preferred for precision) + } + Use "old_name" for the current column name (e.g. "rename col_id to incident_id"). + Provide "table_id" when the same column name exists in multiple tables. Returns: Dict[str, Any]: Updated agent metadata or error response. @@ -801,6 +810,7 @@ async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, "tools": tools, "knowledge_source": knowledge_source, "tables": tables, + "columns": columns, }, tenant_id, ) @@ -813,6 +823,7 @@ async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, tools=tools, knowledge_source=knowledge_source, tables=tables, + columns=columns, tenant_id=str(tenant_id), ) diff --git a/services/upload_processor.py b/services/upload_processor.py index a74269b..ae52994 100644 --- a/services/upload_processor.py +++ b/services/upload_processor.py @@ -329,7 +329,7 @@ def _upsert_agent_tools(conn, card: dict, agent_internal_id: str, now_str: str): # --------------------------------------------------------------------------- -# Step X — core.tables (agent -> tables, tools -> tables) +# Step X — core.tables (table catalog) # --------------------------------------------------------------------------- def _upsert_tables(conn, card: dict, agent_internal_id: str, now_str: str): ident = card.get("identification", {}) @@ -344,7 +344,7 @@ def _upsert_tables(conn, card: dict, agent_internal_id: str, now_str: str): if rows and rows[0].get("tenant_id"): tenant_id = rows[0].get("tenant_id") - # Collect table records: table_id -> {name, tool_id} + # Collect table records; tool_id is used only to populate tool_tables below. tables = {} # First pass: find explicit table nodes (as source or target) # Also capture direct agent->table ownership when present. @@ -394,27 +394,21 @@ def _upsert_tables(conn, card: dict, agent_internal_id: str, now_str: str): name = meta.get("name") tool_id = meta.get("tool_id") tool_name = meta.get("tool_name") or "" - table_agent_id = meta.get("agent_id") - select_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(agent_internal_id)} AS agent_internal_id, {_sq(table_agent_id)} AS agent_id, {_sq(tool_id)} AS tool_id, {_sq(tid)} AS table_id, {_sq(name)} AS name, TIMESTAMP '{now_str}' AS now_ts") + select_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(tid)} AS table_id, {_sq(name)} AS name, TIMESTAMP '{now_str}' AS now_ts") agent_table_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(agent_id)} AS agent_id, {_sq(agent_name)} AS agent_name, {_sq(agent_internal_id)} AS agent_internal_id, {_sq(tid)} AS table_id, {_sq(name)} AS table_name, TIMESTAMP '{now_str}' AS now_ts") if tool_id: - tool_table_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(tool_id)} AS tool_id, {_sq(tool_name)} AS tool_name, {_sq(tid)} AS table_id, {_sq(name)} AS table_name, {_sq(agent_id)} AS agent_id, {_sq(agent_internal_id)} AS agent_internal_id, TIMESTAMP '{now_str}' AS now_ts") + tool_table_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(tool_id)} AS tool_id, {_sq(tool_name)} AS tool_name, {_sq(tid)} AS table_id, {_sq(name)} AS table_name, TIMESTAMP '{now_str}' AS now_ts") union_all = "\nUNION ALL\n".join(select_rows) _exec(conn, f""" INSERT INTO {CORE}.tables ( - tenant_id, agent_internal_id, agent_id, tool_id, table_id, name, created_ts, updated_ts + tenant_id, table_id, name, created_ts, updated_ts ) - SELECT tenant_id, agent_internal_id, agent_id, tool_id, table_id, name, now_ts, now_ts + SELECT tenant_id, table_id, name, now_ts, now_ts FROM ({union_all}) AS s - ON CONFLICT (agent_internal_id, table_id) + ON CONFLICT (table_id) DO UPDATE SET - agent_id = CASE - WHEN EXCLUDED.tool_id IS NOT NULL THEN NULL - ELSE COALESCE(EXCLUDED.agent_id, {CORE}.tables.agent_id) - END, name = COALESCE(EXCLUDED.name, {CORE}.tables.name), - tool_id = COALESCE(EXCLUDED.tool_id, {CORE}.tables.tool_id), updated_ts = EXCLUDED.updated_ts """, f"tables upsert ({len(select_rows)})") @@ -441,17 +435,15 @@ def _upsert_tables(conn, card: dict, agent_internal_id: str, now_str: str): _exec(conn, f""" INSERT INTO {CORE}.tool_tables ( tenant_id, tool_id, tool_name, table_id, table_name, - agent_id, agent_internal_id, created_ts, updated_ts + created_ts, updated_ts ) SELECT tenant_id, tool_id, tool_name, table_id, table_name, - agent_id, agent_internal_id, now_ts, now_ts + now_ts, now_ts FROM ({ut}) AS s WHERE tool_id IS NOT NULL AND tool_id <> '' ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET tool_name = COALESCE(EXCLUDED.tool_name, {CORE}.tool_tables.tool_name), table_name = COALESCE(EXCLUDED.table_name, {CORE}.tool_tables.table_name), - agent_id = EXCLUDED.agent_id, - agent_internal_id = EXCLUDED.agent_internal_id, updated_ts = EXCLUDED.updated_ts """, f"tool_tables upsert ({len(tool_table_rows)})") @@ -479,9 +471,10 @@ def _upsert_columns(conn, card: dict, agent_internal_id: str, now_str: str): table_id = ds.get("source_object_id") table_name = ds.get("source_object_name") or "" col_name = ds.get("target_object_name") - if table_id and col_name: - select_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(table_id)} AS table_id, {_sq(col_name)} AS name, TIMESTAMP '{now_str}' AS now_ts") - tc_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(table_id)} AS table_id, {_sq(table_name)} AS table_name, {_sq(col_name)} AS column_name, TIMESTAMP '{now_str}' AS now_ts") + column_id = ds.get("target_object_id") or col_name + if table_id and col_name and column_id: + select_rows.append(f"SELECT {_sq(column_id)} AS column_id, {_sq(tenant_id)} AS tenant_id, {_sq(col_name)} AS name, TIMESTAMP '{now_str}' AS now_ts") + tc_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(table_id)} AS table_id, {_sq(table_name)} AS table_name, {_sq(col_name)} AS column_name, {_sq(column_id)} AS column_id, TIMESTAMP '{now_str}' AS now_ts") if not select_rows: return @@ -489,11 +482,11 @@ def _upsert_columns(conn, card: dict, agent_internal_id: str, now_str: str): union_all = "\nUNION ALL\n".join(select_rows) _exec(conn, f""" INSERT INTO {CORE}.columns ( - tenant_id, table_id, name, created_ts, updated_ts + column_id, tenant_id, name, created_ts, updated_ts ) - SELECT tenant_id, table_id, name, now_ts, now_ts + SELECT column_id, tenant_id, name, now_ts, now_ts FROM ({union_all}) AS s - ON CONFLICT (table_id, name) + ON CONFLICT (column_id) DO UPDATE SET updated_ts = EXCLUDED.updated_ts """, f"columns upsert ({len(select_rows)})") @@ -502,12 +495,13 @@ def _upsert_columns(conn, card: dict, agent_internal_id: str, now_str: str): utc = "\nUNION ALL\n".join(tc_rows) _exec(conn, f""" INSERT INTO {CORE}.table_columns ( - tenant_id, table_id, table_name, column_name, created_ts, updated_ts + tenant_id, table_id, table_name, column_name, column_id, created_ts, updated_ts ) - SELECT tenant_id, table_id, table_name, column_name, now_ts, now_ts + SELECT tenant_id, table_id, table_name, column_name, column_id, now_ts, now_ts FROM ({utc}) AS s ON CONFLICT (tenant_id, table_id, column_name) DO UPDATE SET table_name = COALESCE(EXCLUDED.table_name, {CORE}.table_columns.table_name), + column_id = COALESCE(EXCLUDED.column_id, {CORE}.table_columns.column_id), updated_ts = EXCLUDED.updated_ts """, f"table_columns upsert ({len(tc_rows)})") diff --git a/sql/core/columns.sql b/sql/core/columns.sql index 3b1c541..1f8cb21 100644 --- a/sql/core/columns.sql +++ b/sql/core/columns.sql @@ -1,6 +1,6 @@ CREATE TABLE IF NOT EXISTS core.columns ( + column_id TEXT PRIMARY KEY, tenant_id TEXT, - table_id TEXT, name TEXT, created_ts TIMESTAMP, updated_ts TIMESTAMP diff --git a/sql/core/table_columns.sql b/sql/core/table_columns.sql index ff563d1..85c0cf2 100644 --- a/sql/core/table_columns.sql +++ b/sql/core/table_columns.sql @@ -3,6 +3,7 @@ CREATE TABLE IF NOT EXISTS core.table_columns ( table_id TEXT, table_name TEXT, column_name TEXT, + column_id TEXT, created_ts TIMESTAMP, updated_ts TIMESTAMP ); diff --git a/sql/core/tables.sql b/sql/core/tables.sql index 9ffd715..99e2536 100644 --- a/sql/core/tables.sql +++ b/sql/core/tables.sql @@ -1,8 +1,5 @@ CREATE TABLE IF NOT EXISTS core.tables ( tenant_id TEXT, - agent_internal_id TEXT, - agent_id TEXT, - tool_id TEXT, table_id TEXT PRIMARY KEY, name TEXT, country_of_provenance TEXT, diff --git a/sql/core/tool_tables.sql b/sql/core/tool_tables.sql index 7107412..578f0d1 100644 --- a/sql/core/tool_tables.sql +++ b/sql/core/tool_tables.sql @@ -1,11 +1,9 @@ CREATE TABLE IF NOT EXISTS core.tool_tables ( - tenant_id TEXT, - tool_id TEXT, - tool_name TEXT, - table_id TEXT, - table_name TEXT, - agent_id TEXT, - agent_internal_id TEXT, - created_ts TIMESTAMP, - updated_ts TIMESTAMP + tenant_id TEXT, + tool_id TEXT, + tool_name TEXT, + table_id TEXT, + table_name TEXT, + created_ts TIMESTAMP, + updated_ts TIMESTAMP ); diff --git a/sql/core/zz_agent_upsert_unique_indexes.sql b/sql/core/zz_agent_upsert_unique_indexes.sql index 4ef0ad2..9c9c4cb 100644 --- a/sql/core/zz_agent_upsert_unique_indexes.sql +++ b/sql/core/zz_agent_upsert_unique_indexes.sql @@ -61,14 +61,9 @@ ON core.business_applications (business_application_id); CREATE UNIQUE INDEX IF NOT EXISTS ux_core_business_processes ON core.business_processes (business_process_id); -CREATE UNIQUE INDEX IF NOT EXISTS ux_core_columns -ON core.columns (table_id, name); +-- ux_core_columns removed: column_id is now the PRIMARY KEY -CREATE UNIQUE INDEX IF NOT EXISTS ux_core_tables -ON core.tables (agent_internal_id, table_id); - -CREATE INDEX IF NOT EXISTS ix_core_tables_agent_tool -ON core.tables (agent_internal_id, tool_id); +-- ux_core_tables removed: table_id is already the PRIMARY KEY CREATE UNIQUE INDEX IF NOT EXISTS ux_core_tool_tables ON core.tool_tables (tenant_id, tool_id, table_id); @@ -81,6 +76,106 @@ ON core.table_columns (tenant_id, table_id, column_name); DO $$ BEGIN + IF to_regclass('core.tables') IS NOT NULL THEN + ALTER TABLE core.tables DROP CONSTRAINT IF EXISTS fk_core_tables_agent_tool; + DROP INDEX IF EXISTS core.ix_core_tables_agent_tool; + + IF to_regclass('core.agent_tables') IS NOT NULL + AND EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tables' AND column_name = 'agent_id' + ) + THEN + EXECUTE ' + INSERT INTO core.agent_tables ( + tenant_id, agent_id, agent_name, agent_internal_id, + table_id, table_name, created_ts, updated_ts + ) + SELECT + t.tenant_id, + t.agent_id, + ag.agent_name, + t.agent_internal_id, + t.table_id, + t.name, + COALESCE(t.created_ts, CURRENT_TIMESTAMP), + CURRENT_TIMESTAMP + FROM core.tables t + LEFT JOIN core.agents ag + ON ag.agent_id = t.agent_id + AND ag.agent_internal_id = t.agent_internal_id + WHERE t.agent_id IS NOT NULL + AND t.agent_id <> '''' + ON CONFLICT (tenant_id, agent_id, table_id) DO UPDATE SET + agent_name = COALESCE(EXCLUDED.agent_name, core.agent_tables.agent_name), + agent_internal_id = EXCLUDED.agent_internal_id, + table_name = COALESCE(EXCLUDED.table_name, core.agent_tables.table_name), + updated_ts = EXCLUDED.updated_ts + '; + END IF; + + IF to_regclass('core.tool_tables') IS NOT NULL + AND EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tables' AND column_name = 'tool_id' + ) + THEN + EXECUTE ' + INSERT INTO core.tool_tables ( + tenant_id, tool_id, tool_name, table_id, table_name, + created_ts, updated_ts + ) + SELECT + t.tenant_id, + t.tool_id, + at.tool_name, + t.table_id, + t.name, + COALESCE(t.created_ts, CURRENT_TIMESTAMP), + CURRENT_TIMESTAMP + FROM core.tables t + LEFT JOIN core.agent_tools at + ON at.tool_id = t.tool_id + AND at.agent_internal_id = t.agent_internal_id + WHERE t.tool_id IS NOT NULL + AND t.tool_id <> '''' + ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET + tool_name = COALESCE(EXCLUDED.tool_name, core.tool_tables.tool_name), + table_name = COALESCE(EXCLUDED.table_name, core.tool_tables.table_name), + updated_ts = EXCLUDED.updated_ts + '; + END IF; + + IF to_regclass('core.tool_tables') IS NOT NULL THEN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tool_tables' AND column_name = 'agent_id' + ) THEN + ALTER TABLE core.tool_tables DROP COLUMN agent_id; + END IF; + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tool_tables' AND column_name = 'agent_internal_id' + ) THEN + ALTER TABLE core.tool_tables DROP COLUMN agent_internal_id; + END IF; + END IF; + + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tables' AND column_name = 'agent_id' + ) THEN + ALTER TABLE core.tables DROP COLUMN agent_id; + END IF; + + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tables' AND column_name = 'tool_id' + ) THEN + ALTER TABLE core.tables DROP COLUMN tool_id; + END IF; + END IF; + IF to_regclass('core.agent_ai_use_cases') IS NOT NULL THEN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns @@ -236,15 +331,10 @@ BEGIN BEGIN -- For rows that include agent_internal_id, prefer that lookup IF NEW.tenant_id IS NULL OR NEW.tenant_id = '' THEN - IF TG_TABLE_NAME = 'agent_tools' OR TG_TABLE_NAME = 'tables' THEN + IF TG_TABLE_NAME = 'agent_tools' THEN IF NEW.agent_internal_id IS NOT NULL THEN SELECT tenant_id INTO NEW.tenant_id FROM core.agents WHERE agent_internal_id = NEW.agent_internal_id LIMIT 1; END IF; - ELSIF TG_TABLE_NAME = 'columns' THEN - -- columns only reference table_id; derive tenant from core.tables - IF NEW.table_id IS NOT NULL THEN - SELECT tenant_id INTO NEW.tenant_id FROM core.tables WHERE table_id = NEW.table_id LIMIT 1; - END IF; END IF; END IF; RETURN NEW; @@ -265,7 +355,6 @@ BEGIN IF to_regclass('core.columns') IS NOT NULL THEN EXECUTE 'DROP TRIGGER IF EXISTS trg_populate_tenant_columns ON core.columns'; - EXECUTE 'CREATE TRIGGER trg_populate_tenant_columns BEFORE INSERT OR UPDATE ON core.columns FOR EACH ROW EXECUTE FUNCTION core.populate_tenant_from_agent()'; END IF; END IF; @@ -312,49 +401,41 @@ BEGIN END IF; END IF; - IF to_regclass('core.tables') IS NOT NULL - AND to_regclass('core.columns') IS NOT NULL + IF to_regclass('core.table_columns') IS NOT NULL AND NOT EXISTS ( - SELECT 1 - FROM pg_constraint - WHERE conname = 'fk_core_columns_table' + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'table_columns' AND column_name = 'column_id' ) THEN - ALTER TABLE core.columns - ADD CONSTRAINT fk_core_columns_table - FOREIGN KEY (table_id) - REFERENCES core.tables (table_id) - ON DELETE CASCADE; + ALTER TABLE core.table_columns ADD COLUMN column_id TEXT; END IF; - IF to_regclass('core.agents') IS NOT NULL - AND to_regclass('core.tables') IS NOT NULL - AND NOT EXISTS ( - SELECT 1 - FROM pg_constraint - WHERE conname = 'fk_core_tables_agent' - ) - THEN - ALTER TABLE core.tables - ADD CONSTRAINT fk_core_tables_agent - FOREIGN KEY (agent_internal_id) - REFERENCES core.agents (agent_internal_id) - ON DELETE CASCADE; + IF to_regclass('core.columns') IS NOT NULL THEN + ALTER TABLE core.columns DROP CONSTRAINT IF EXISTS fk_core_columns_table; + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'columns' AND column_name = 'column_id' + ) THEN + ALTER TABLE core.columns ADD COLUMN column_id TEXT; + UPDATE core.columns SET column_id = gen_random_uuid()::text WHERE column_id IS NULL; + ALTER TABLE core.columns ADD PRIMARY KEY (column_id); + END IF; + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'columns' AND column_name = 'table_id' + ) THEN + ALTER TABLE core.columns DROP COLUMN table_id; + END IF; END IF; - IF to_regclass('core.agent_tools') IS NOT NULL - AND to_regclass('core.tables') IS NOT NULL - AND NOT EXISTS ( - SELECT 1 - FROM pg_constraint - WHERE conname = 'fk_core_tables_agent_tool' - ) - THEN - ALTER TABLE core.tables - ADD CONSTRAINT fk_core_tables_agent_tool - FOREIGN KEY (agent_internal_id, tool_id) - REFERENCES core.agent_tools (agent_internal_id, tool_id) - ON DELETE CASCADE; + IF to_regclass('core.tables') IS NOT NULL THEN + ALTER TABLE core.tables DROP CONSTRAINT IF EXISTS fk_core_tables_agent; + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tables' AND column_name = 'agent_internal_id' + ) THEN + ALTER TABLE core.tables DROP COLUMN agent_internal_id; + END IF; END IF; END $$; diff --git a/tavro_api/api/routers/agents.py b/tavro_api/api/routers/agents.py index c5e4897..4f5a407 100644 --- a/tavro_api/api/routers/agents.py +++ b/tavro_api/api/routers/agents.py @@ -425,6 +425,7 @@ def _write_agent_card( "uses_pci": None, }) for column_name in table.get("columns") or []: + col_id = str(uuid.uuid5(uuid.NAMESPACE_OID, f"{table_id}:{column_name}")) data_source_entries.append({ "relationship_id": None, "parent_relationship_id": None, @@ -432,7 +433,7 @@ def _write_agent_card( "source_object_domain": None, "source_object_name": table_name, "source_object_type": "Table", - "target_object_id": column_name, + "target_object_id": col_id, "target_object_domain": None, "target_object_name": column_name, "target_object_type": "Column", @@ -594,26 +595,20 @@ async def create_agent( table["table_id"] = table_id table_name = table.get("name") table_tool_id = table.get("tool_id") - table_agent_id = None if table_tool_id else agent_id await db.execute( text(f""" INSERT INTO {CORE}.tables - (tenant_id, agent_internal_id, agent_id, tool_id, table_id, name, created_ts, updated_ts) + (tenant_id, table_id, name, created_ts, updated_ts) VALUES - (:tid, :iid, :table_agent_id, :tool_id, :table_id, :name, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (agent_internal_id, table_id) + (:tid, :table_id, :name, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (table_id) DO UPDATE SET - agent_id = EXCLUDED.agent_id, - tool_id = EXCLUDED.tool_id, name = COALESCE(EXCLUDED.name, {CORE}.tables.name), updated_ts = EXCLUDED.updated_ts """), { "tid": tenant_id, - "iid": agent_internal_id, - "table_agent_id": table_agent_id, - "tool_id": table_tool_id, "table_id": table_id, "name": table_name, }, @@ -672,15 +667,13 @@ async def create_agent( text(f""" INSERT INTO {CORE}.tool_tables (tenant_id, tool_id, tool_name, table_id, table_name, - agent_id, agent_internal_id, created_ts, updated_ts) + created_ts, updated_ts) VALUES (:tid, :tool_id, :tool_name, :table_id, :table_name, - :aid, :iid, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET tool_name = COALESCE(EXCLUDED.tool_name, {CORE}.tool_tables.tool_name), table_name = COALESCE(EXCLUDED.table_name, {CORE}.tool_tables.table_name), - agent_id = EXCLUDED.agent_id, - agent_internal_id = EXCLUDED.agent_internal_id, updated_ts = EXCLUDED.updated_ts """), { @@ -689,36 +682,36 @@ async def create_agent( "tool_name": table.get("tool_name"), "table_id": table_id, "table_name": table_name, - "aid": agent_id, - "iid": agent_internal_id, }, ) for column_name in table.get("columns") or []: + column_id = str(uuid.uuid5(uuid.NAMESPACE_OID, f"{table_id}:{column_name}")) await db.execute( text(f""" - INSERT INTO {CORE}.columns (tenant_id, table_id, name, created_ts, updated_ts) - VALUES (:tid, :table_id, :col_name, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT (table_id, name) + INSERT INTO {CORE}.columns (column_id, tenant_id, name, created_ts, updated_ts) + VALUES (:col_id, :tid, :col_name, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (column_id) DO UPDATE SET tenant_id = EXCLUDED.tenant_id, updated_ts = EXCLUDED.updated_ts """), - {"tid": tenant_id, "table_id": table_id, "col_name": column_name}, + {"col_id": column_id, "tid": tenant_id, "col_name": column_name}, ) await db.execute( text(f""" INSERT INTO {CORE}.table_columns - (tenant_id, table_id, table_name, column_name, created_ts, updated_ts) + (tenant_id, table_id, table_name, column_name, column_id, created_ts, updated_ts) VALUES - (:tid, :table_id, :table_name, :column_name, + (:tid, :table_id, :table_name, :column_name, :col_id, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT (tenant_id, table_id, column_name) DO UPDATE SET table_name = COALESCE(EXCLUDED.table_name, {CORE}.table_columns.table_name), + column_id = COALESCE(EXCLUDED.column_id, {CORE}.table_columns.column_id), updated_ts = EXCLUDED.updated_ts """), {"tid": tenant_id, "table_id": table_id, - "table_name": table_name, "column_name": column_name}, + "table_name": table_name, "column_name": column_name, "col_id": column_id}, ) await db.execute( text(f""" @@ -732,7 +725,7 @@ async def create_agent( :tid, :iid, :aid, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, :table_id, :table_name, 'Table', - :column_name, :column_name, 'Column' + :col_id, :column_name, 'Column' ) ON CONFLICT (agent_internal_id, source_object_id, target_object_id) DO UPDATE SET @@ -746,6 +739,7 @@ async def create_agent( "aid": agent_id, "table_id": table_id, "table_name": table_name, + "col_id": column_id, "column_name": column_name, }, ) diff --git a/tavro_library/agent_library.py b/tavro_library/agent_library.py index 627ab8b..6c8013c 100644 --- a/tavro_library/agent_library.py +++ b/tavro_library/agent_library.py @@ -913,6 +913,7 @@ def _write_agent_card( "uses_pci": None, }) for column_name in table.get("columns") or []: + col_id = str(uuid.uuid5(uuid.NAMESPACE_OID, f"{table_id}:{column_name}")) data_source_entries.append({ "relationship_id": None, "parent_relationship_id": None, @@ -920,7 +921,7 @@ def _write_agent_card( "source_object_domain": None, "source_object_name": table_name, "source_object_type": "Table", - "target_object_id": column_name, + "target_object_id": col_id, "target_object_domain": None, "target_object_name": column_name, "target_object_type": "Column", @@ -1156,15 +1157,10 @@ def create_agent( table_name = cls.sanitize(table.get("name") or "") table_tool_id = cls.sanitize(table.get("tool_id") or "") table_tool_name = cls.sanitize(table.get("tool_name") or "") - table_agent_id_value = "NULL" if table_tool_id else f"'{agent_id}'" - table_tool_id_value = f"'{table_tool_id}'" if table_tool_id else "NULL" table_values.append(f""" ( {tenant_id_value} - '{agent_internal_id}', - {table_agent_id_value}, - {table_tool_id_value}, '{table_id}', '{table_name}', TIMESTAMP '{now}', @@ -1210,8 +1206,6 @@ def create_agent( '{table_tool_name}', '{table_id}', '{table_name}', - '{agent_id}', - '{agent_internal_id}', TIMESTAMP '{now}', TIMESTAMP '{now}' ) @@ -1221,10 +1215,11 @@ def create_agent( clean_column = cls.sanitize(column_name) if not clean_column: continue + column_id = str(uuid.uuid5(uuid.NAMESPACE_OID, f"{table_id}:{clean_column}")) column_values.append(f""" ( + '{column_id}', {tenant_id_value} - '{table_id}', '{clean_column}', TIMESTAMP '{now}', TIMESTAMP '{now}' @@ -1237,6 +1232,7 @@ def create_agent( '{table_id}', '{table_name}', '{clean_column}', + '{column_id}', TIMESTAMP '{now}', TIMESTAMP '{now}' ) @@ -1251,7 +1247,7 @@ def create_agent( '{table_id}', '{table_name}', 'Table', - '{clean_column}', + '{column_id}', '{clean_column}', 'Column' ) @@ -1261,9 +1257,6 @@ def create_agent( queries.append(f""" INSERT INTO {cls.CORE_DB_NAME}.tables ( {tenant_id_column} - agent_internal_id, - agent_id, - tool_id, table_id, name, created_ts, @@ -1271,10 +1264,8 @@ def create_agent( ) VALUES {",".join(table_values)} - ON CONFLICT (agent_internal_id, table_id) + ON CONFLICT (table_id) DO UPDATE SET - agent_id = EXCLUDED.agent_id, - tool_id = EXCLUDED.tool_id, name = COALESCE(EXCLUDED.name, {cls.CORE_DB_NAME}.tables.name), updated_ts = EXCLUDED.updated_ts """) @@ -1282,15 +1273,15 @@ def create_agent( if column_values: queries.append(f""" INSERT INTO {cls.CORE_DB_NAME}.columns ( + column_id, {tenant_id_column} - table_id, name, created_ts, updated_ts ) VALUES {",".join(column_values)} - ON CONFLICT (table_id, name) + ON CONFLICT (column_id) DO UPDATE SET updated_ts = EXCLUDED.updated_ts """) @@ -1316,15 +1307,13 @@ def create_agent( INSERT INTO {cls.CORE_DB_NAME}.tool_tables ( {tenant_id_column} tool_id, tool_name, table_id, table_name, - agent_id, agent_internal_id, created_ts, updated_ts + created_ts, updated_ts ) VALUES {",".join(tool_table_values)} ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET tool_name = COALESCE(EXCLUDED.tool_name, {cls.CORE_DB_NAME}.tool_tables.tool_name), table_name = COALESCE(EXCLUDED.table_name, {cls.CORE_DB_NAME}.tool_tables.table_name), - agent_id = EXCLUDED.agent_id, - agent_internal_id = EXCLUDED.agent_internal_id, updated_ts = EXCLUDED.updated_ts """) @@ -1332,12 +1321,13 @@ def create_agent( queries.append(f""" INSERT INTO {cls.CORE_DB_NAME}.table_columns ( {tenant_id_column} - table_id, table_name, column_name, created_ts, updated_ts + table_id, table_name, column_name, column_id, created_ts, updated_ts ) VALUES {",".join(table_column_values)} ON CONFLICT (tenant_id, table_id, column_name) DO UPDATE SET table_name = COALESCE(EXCLUDED.table_name, {cls.CORE_DB_NAME}.table_columns.table_name), + column_id = COALESCE(EXCLUDED.column_id, {cls.CORE_DB_NAME}.table_columns.column_id), updated_ts = EXCLUDED.updated_ts """) @@ -2134,6 +2124,7 @@ def update_agent( tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None, tables: Optional[List[Dict[str, Any]]] = None, + columns: Optional[List[Dict[str, Any]]] = None, tenant_id: Optional[str] = None ) -> Dict[str, Any]: """ @@ -2206,8 +2197,7 @@ def update_agent( if lookup_name: found = cls.execute_select( f"SELECT table_id FROM {cls.CORE_DB_NAME}.tables " - f"WHERE agent_internal_id = '{agent_internal_id}' " - f"AND LOWER(name) = LOWER('{lookup_name}') LIMIT 1" + f"WHERE LOWER(name) = LOWER('{lookup_name}') {tenant_where} LIMIT 1" ) if found: table_id = cls.sanitize(str(found[0].get("table_id") or "").strip()) @@ -2218,7 +2208,7 @@ def update_agent( # Update core.tables cls.execute_dml( f"UPDATE {cls.CORE_DB_NAME}.tables SET name = '{new_name}', updated_ts = TIMESTAMP '{now}' " - f"WHERE table_id = '{table_id}' AND agent_internal_id = '{agent_internal_id}'" + f"WHERE table_id = '{table_id}'" ) # Propagate new name to all three relationship tables cls.execute_dml( @@ -2252,9 +2242,51 @@ def update_agent( ) tables_updated += 1 + columns_updated = 0 + for col in (columns or []): + if not isinstance(col, dict): + continue + new_name = cls.sanitize(str(col.get("name") or "").strip()) + old_name = cls.sanitize(str(col.get("old_name") or "").strip()) + if not new_name or not old_name: + continue + + scoped_table_id = cls.sanitize(str(col.get("table_id") or "").strip()) + table_filter = f"AND table_id = '{scoped_table_id}'" if scoped_table_id else "" + tenant_filter = f"AND tenant_id = '{tenant_clean}'" if is_tenant else "" + + found = cls.execute_select( + f"SELECT column_id FROM {cls.CORE_DB_NAME}.table_columns " + f"WHERE LOWER(column_name) = LOWER('{old_name}') {table_filter} {tenant_filter} LIMIT 1" + ) + if not found: + continue + column_id = cls.sanitize(str(found[0].get("column_id") or "").strip()) + if not column_id: + continue + + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.columns SET name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE column_id = '{column_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.table_columns SET column_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE column_id = '{column_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.agent_data_sources " + f"SET target_object_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE agent_internal_id = '{agent_internal_id}' " + f"AND LOWER(target_object_type) = 'column' " + f"AND LOWER(target_object_name) = LOWER('{old_name}')" + ) + columns_updated += 1 + msg = "Agent updated successfully." if tables_updated: msg += f" {tables_updated} table(s) renamed." + if columns_updated: + msg += f" {columns_updated} column(s) renamed." return {"message": msg, "agent_id": agent_id} @classmethod From e4f5e9ddd7f8ef6b51cd9c8939de7904564ca38d Mon Sep 17 00:00:00 2001 From: prasannakumar-tavro Date: Fri, 5 Jun 2026 18:41:24 +0530 Subject: [PATCH 5/7] updated the code removed the conflict queries and create agent and update agent tools parameters for consistency --- mcp_server/server.py | 7 +- tavro_library/agent_library.py | 297 +++++++++++++++++++++++++-------- 2 files changed, 231 insertions(+), 73 deletions(-) diff --git a/mcp_server/server.py b/mcp_server/server.py index 71c1edd..194f2da 100644 --- a/mcp_server/server.py +++ b/mcp_server/server.py @@ -351,6 +351,7 @@ async def create_agent( instruction: str, tools: Optional[List[Dict[str, Any]]] = None, tables: Optional[List[Dict[str, Any]]] = None, + columns: Optional[List[Dict[str, Any]]] = None, data_source: Optional[List[Dict[str, Any]]] = None, knowledge_source: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: @@ -435,6 +436,7 @@ async def create_agent( "instruction": instruction, "tools": tools, "tables": tables, + "columns": columns, "data_source": data_source, "knowledge_source": knowledge_source, }, @@ -447,6 +449,7 @@ async def create_agent( instruction=instruction, tools=tools, tables=tables, + columns=columns, data_source=data_source, knowledge_source=knowledge_source, tenant_id=tenant_id, @@ -754,7 +757,7 @@ async def remove_ai_use_case_agent_relationship(original_prompt: str, *, agent_c return {"error": "INTERNAL_ERROR", "details": str(e)} @core.tool(name="update_agent") -async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, agent_name: Optional[str] = None, description: Optional[str] = None, instruction: Optional[str] = None, tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None, tables: Optional[List[Dict[str, Any]]] = None, columns: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: +async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, agent_name: Optional[str] = None, description: Optional[str] = None, instruction: Optional[str] = None, tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None, tables: Optional[List[Dict[str, Any]]] = None, columns: Optional[List[Dict[str, Any]]] = None, data_source: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: """ Update an existing AI agent’s configuration. @@ -811,6 +814,7 @@ async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, "knowledge_source": knowledge_source, "tables": tables, "columns": columns, + "data_source": data_source, }, tenant_id, ) @@ -824,6 +828,7 @@ async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, knowledge_source=knowledge_source, tables=tables, columns=columns, + data_source=data_source, tenant_id=str(tenant_id), ) diff --git a/tavro_library/agent_library.py b/tavro_library/agent_library.py index 6c8013c..a8282fa 100644 --- a/tavro_library/agent_library.py +++ b/tavro_library/agent_library.py @@ -432,6 +432,39 @@ def get_agent_card(cls, agent_name: Optional[str] = None, agent_id: Optional[str except Exception as ds_overlay_err: print(f"[get_agent_card] Data source overlay failed (returning card as-is): {ds_overlay_err}") + # Overlay tool list from DB so tools added via update_agent are visible. + try: + tool_rows = cls.execute_select( + f""" + SELECT tool_id, tool_name, tool_description, + delegation_possible, allowed_delegates, + input_schema_json_text, output_schema_json_text, + default_config_json_text + FROM {cls.CORE_DB_NAME}.agent_tools + WHERE agent_id = %s + ORDER BY created_ts NULLS LAST + """, + (agent_id_clean,), + ) + if tool_rows: + local_card["tool"] = [ + { + "identifier": r.get("tool_id"), + "name": r.get("tool_name"), + "description": r.get("tool_description"), + "delegation_possible": r.get("delegation_possible"), + "allowed_delegates": r.get("allowed_delegates"), + "parameter_name": None, + "parameter_type": None, + "default_value": r.get("default_config_json_text"), + "input_schema": r.get("input_schema_json_text"), + "output_schema": r.get("output_schema_json_text"), + } + for r in tool_rows + ] + except Exception as tool_overlay_err: + print(f"[get_agent_card] Tool overlay failed (returning card as-is): {tool_overlay_err}") + return local_card # ---------- 7. Not found ---------- @@ -716,7 +749,7 @@ def _normalize_tables_payload( existing_columns.add(column_key) for item in normalized.values(): - item["table_id"] = item.get("table_id") or str(uuid.uuid4()) + item["table_id"] = str(uuid.uuid4()) return list(normalized.values()) @staticmethod @@ -913,7 +946,7 @@ def _write_agent_card( "uses_pci": None, }) for column_name in table.get("columns") or []: - col_id = str(uuid.uuid5(uuid.NAMESPACE_OID, f"{table_id}:{column_name}")) + col_id = str(uuid.uuid4()) data_source_entries.append({ "relationship_id": None, "parent_relationship_id": None, @@ -1013,6 +1046,7 @@ def create_agent( instruction: str, tools: Optional[List[Dict[str, Any]]] = None, tables: Optional[List[Dict[str, Any]]] = None, + columns: Optional[List[Dict[str, Any]]] = None, data_source: Optional[List[Dict[str, Any]]] = None, knowledge_source: Optional[Dict[str, str]] = None, tenant_id: Optional[str] = None @@ -1043,6 +1077,22 @@ def create_agent( tool_name_to_id: Dict[str, str] = {} tables_payload = cls._normalize_tables_payload(tables, tools, data_source) + # Merge top-level columns into their matching table entries + for col_entry in (columns or []): + if not isinstance(col_entry, dict): + continue + col_name = str(col_entry.get("name") or "").strip() + if not col_name: + continue + match_id = str(col_entry.get("table_id") or "").strip() + match_name = str(col_entry.get("table_name") or "").strip().lower() + for tbl in tables_payload: + if (match_id and tbl.get("table_id") == match_id) or \ + (match_name and str(tbl.get("name") or "").strip().lower() == match_name): + if col_name not in (tbl.get("columns") or []): + tbl.setdefault("columns", []).append(col_name) + break + # 1. agents table tenant_id_value = f"'{tenant_id}'," if tenant_id else "" tenant_id_column = "tenant_id," if tenant_id else "" @@ -1152,7 +1202,7 @@ def create_agent( if tool_name_key and not table.get("tool_id"): table["tool_id"] = tool_name_to_id.get(tool_name_key) - table_id = cls.sanitize(table.get("table_id") or str(uuid.uuid4())) + table_id = str(uuid.uuid4()) table["table_id"] = table_id table_name = cls.sanitize(table.get("name") or "") table_tool_id = cls.sanitize(table.get("tool_id") or "") @@ -1215,7 +1265,7 @@ def create_agent( clean_column = cls.sanitize(column_name) if not clean_column: continue - column_id = str(uuid.uuid5(uuid.NAMESPACE_OID, f"{table_id}:{clean_column}")) + column_id = str(uuid.uuid4()) column_values.append(f""" ( '{column_id}', @@ -1264,10 +1314,6 @@ def create_agent( ) VALUES {",".join(table_values)} - ON CONFLICT (table_id) - DO UPDATE SET - name = COALESCE(EXCLUDED.name, {cls.CORE_DB_NAME}.tables.name), - updated_ts = EXCLUDED.updated_ts """) if column_values: @@ -1281,9 +1327,6 @@ def create_agent( ) VALUES {",".join(column_values)} - ON CONFLICT (column_id) - DO UPDATE SET - updated_ts = EXCLUDED.updated_ts """) if agent_table_values: @@ -1295,11 +1338,6 @@ def create_agent( ) VALUES {",".join(agent_table_values)} - ON CONFLICT (tenant_id, agent_id, table_id) DO UPDATE SET - agent_name = EXCLUDED.agent_name, - agent_internal_id = EXCLUDED.agent_internal_id, - table_name = COALESCE(EXCLUDED.table_name, {cls.CORE_DB_NAME}.agent_tables.table_name), - updated_ts = EXCLUDED.updated_ts """) if tool_table_values: @@ -1311,10 +1349,6 @@ def create_agent( ) VALUES {",".join(tool_table_values)} - ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET - tool_name = COALESCE(EXCLUDED.tool_name, {cls.CORE_DB_NAME}.tool_tables.tool_name), - table_name = COALESCE(EXCLUDED.table_name, {cls.CORE_DB_NAME}.tool_tables.table_name), - updated_ts = EXCLUDED.updated_ts """) if table_column_values: @@ -1325,10 +1359,6 @@ def create_agent( ) VALUES {",".join(table_column_values)} - ON CONFLICT (tenant_id, table_id, column_name) DO UPDATE SET - table_name = COALESCE(EXCLUDED.table_name, {cls.CORE_DB_NAME}.table_columns.table_name), - column_id = COALESCE(EXCLUDED.column_id, {cls.CORE_DB_NAME}.table_columns.column_id), - updated_ts = EXCLUDED.updated_ts """) # 4. knowledge sources (ONLY name + description) @@ -2125,6 +2155,7 @@ def update_agent( knowledge_source: Optional[Dict[str, str]] = None, tables: Optional[List[Dict[str, Any]]] = None, columns: Optional[List[Dict[str, Any]]] = None, + data_source: Optional[List[Dict[str, Any]]] = None, tenant_id: Optional[str] = None ) -> Dict[str, Any]: """ @@ -2154,6 +2185,7 @@ def update_agent( raise ValueError(f"Agent '{agent_id}' not found.") agent_internal_id = rows[0].get("agent_internal_id") + current_agent_name = cls.sanitize(str(rows[0].get("agent_name") or "").strip()) now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Batch updates into single transaction @@ -2169,12 +2201,37 @@ def update_agent( cls.execute_dml(f"INSERT INTO {cls.CORE_DB_NAME}.agent_identifications ({tenant_col}agent_internal_id, agent_id, instruction, created_ts, updated_ts, is_current) VALUES ({tenant_val}'{agent_internal_id}', '{agent_id}', '{instr}', TIMESTAMP '{now}', TIMESTAMP '{now}', true)") if tools: - cls.execute_dml(f"DELETE FROM {cls.CORE_DB_NAME}.agent_tools WHERE agent_id = '{agent_id}' {tenant_where}") - vals = [] + tool_rows = [] + tool_ds_rows = [] for t in tools: - vals.append(f"({tenant_val}'{agent_internal_id}', '{agent_id}', '{cls.sanitize(t.get('name', ''))}', '{cls.sanitize(t.get('description', ''))}', TIMESTAMP '{now}', TIMESTAMP '{now}')") - if vals: - cls.execute_dml(f"INSERT INTO {cls.CORE_DB_NAME}.agent_tools ({tenant_col}agent_internal_id, agent_id, tool_name, tool_description, created_ts, updated_ts) VALUES {','.join(vals)}") + tool_id = str(uuid.uuid4()) + t_name = cls.sanitize(t.get("name", "")) + t_desc = cls.sanitize(t.get("description", "")) + tool_rows.append( + f"({tenant_val}'{agent_internal_id}', '{tool_id}', '{agent_id}', " + f"'{t_name}', '{t_desc}', TIMESTAMP '{now}', TIMESTAMP '{now}')" + ) + tool_ds_rows.append( + f"({tenant_val}'{agent_internal_id}', '{agent_id}', " + f"NULL, NULL::boolean, NULL::boolean, NULL::boolean, " + f"TIMESTAMP '{now}', TIMESTAMP '{now}', " + f"'{agent_id}', NULL, '{current_agent_name}', 'Agent', " + f"'{tool_id}', NULL, '{t_name}', 'Tool')" + ) + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.agent_tools " + f"({tenant_col}agent_internal_id, tool_id, agent_id, tool_name, tool_description, created_ts, updated_ts) " + f"VALUES {','.join(tool_rows)}" + ) + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.agent_data_sources " + f"({tenant_col}agent_internal_id, agent_id, " + f"access_level, contains_pii, contains_phi, contains_pci, " + f"created_ts, updated_ts, " + f"source_object_id, source_object_domain, source_object_name, source_object_type, " + f"target_object_id, target_object_domain, target_object_name, target_object_type) " + f"VALUES {','.join(tool_ds_rows)}" + ) if knowledge_source: cls.execute_dml(f"DELETE FROM {cls.CORE_DB_NAME}.agent_knowledge_sources WHERE agent_id = '{agent_id}' {tenant_where}") @@ -2182,6 +2239,11 @@ def update_agent( ks_desc = cls.sanitize(knowledge_source.get("description", "")) cls.execute_dml(f"INSERT INTO {cls.CORE_DB_NAME}.agent_knowledge_sources ({tenant_col}agent_internal_id, agent_id, name, description, created_ts, updated_ts) VALUES ({tenant_val}'{agent_internal_id}', '{agent_id}', '{ks_name}', '{ks_desc}', TIMESTAMP '{now}', TIMESTAMP '{now}')") + # Merge data_source entries into tables so both paths produce the same result + if data_source: + extra = cls._normalize_tables_payload(None, None, data_source) + tables = list(tables or []) + extra + tables_updated = 0 for table in (tables or []): if not isinstance(table, dict): @@ -2190,56 +2252,147 @@ def update_agent( if not new_name: continue - # Resolve table_id: prefer explicit table_id, fall back to old_name or current name lookup - table_id = cls.sanitize(str(table.get("table_id") or "").strip()) - if not table_id: - lookup_name = cls.sanitize(str(table.get("old_name") or table.get("name") or "").strip()) - if lookup_name: + old_name = cls.sanitize(str(table.get("old_name") or "").strip()) + + if old_name: + # ── RENAME path ────────────────────────────────────────────── + table_id = cls.sanitize(str(table.get("table_id") or "").strip()) + if not table_id: found = cls.execute_select( f"SELECT table_id FROM {cls.CORE_DB_NAME}.tables " - f"WHERE LOWER(name) = LOWER('{lookup_name}') {tenant_where} LIMIT 1" + f"WHERE LOWER(name) = LOWER('{old_name}') {tenant_where} LIMIT 1" ) if found: table_id = cls.sanitize(str(found[0].get("table_id") or "").strip()) + if not table_id: + continue - if not table_id: - continue + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.tables SET name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE table_id = '{table_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.agent_tables SET table_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE table_id = '{table_id}' AND agent_id = '{agent_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.tool_tables SET table_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE table_id = '{table_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.table_columns SET table_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE table_id = '{table_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.agent_data_sources " + f"SET target_object_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE agent_internal_id = '{agent_internal_id}' " + f"AND target_object_id = '{table_id}' " + f"AND LOWER(target_object_type) = 'table'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.agent_data_sources " + f"SET source_object_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE agent_internal_id = '{agent_internal_id}' " + f"AND source_object_id = '{table_id}' " + f"AND LOWER(source_object_type) = 'table'" + ) + + else: + # ── INSERT path (new table) ─────────────────────────────────── + table_id = str(uuid.uuid4()) + + tbl_tool_name = cls.sanitize(str(table.get("tool_name") or "").strip()) + tbl_tool_id = cls.sanitize(str(table.get("tool_id") or "").strip()) + if tbl_tool_name and not tbl_tool_id: + tool_rows = cls.execute_select( + f"SELECT tool_id FROM {cls.CORE_DB_NAME}.agent_tools " + f"WHERE agent_id = '{agent_id}' AND LOWER(tool_name) = LOWER('{tbl_tool_name}') {tenant_where} LIMIT 1" + ) + if tool_rows: + tbl_tool_id = cls.sanitize(str(tool_rows[0].get("tool_id") or "").strip()) + + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.tables ({tenant_col}table_id, name, created_ts, updated_ts) " + f"VALUES ({tenant_val}'{table_id}', '{new_name}', TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (table_id) DO UPDATE SET " + f"name = COALESCE(EXCLUDED.name, {cls.CORE_DB_NAME}.tables.name), updated_ts = EXCLUDED.updated_ts" + ) + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.agent_tables " + f"({tenant_col}agent_id, agent_name, agent_internal_id, table_id, table_name, created_ts, updated_ts) " + f"VALUES ({tenant_val}'{agent_id}', '{current_agent_name}', '{agent_internal_id}', " + f"'{table_id}', '{new_name}', TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (tenant_id, agent_id, table_id) DO UPDATE SET " + f"table_name = COALESCE(EXCLUDED.table_name, {cls.CORE_DB_NAME}.agent_tables.table_name), " + f"updated_ts = EXCLUDED.updated_ts" + ) + + src_id = tbl_tool_id or agent_id + src_name = tbl_tool_name or current_agent_name + src_type = 'Tool' if tbl_tool_id else 'Agent' + + if tbl_tool_id: + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.tool_tables " + f"({tenant_col}tool_id, tool_name, table_id, table_name, created_ts, updated_ts) " + f"VALUES ({tenant_val}'{tbl_tool_id}', '{tbl_tool_name}', '{table_id}', '{new_name}', " + f"TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET " + f"table_name = COALESCE(EXCLUDED.table_name, {cls.CORE_DB_NAME}.tool_tables.table_name), " + f"updated_ts = EXCLUDED.updated_ts" + ) + + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.agent_data_sources " + f"({tenant_col}agent_internal_id, agent_id, " + f"source_object_id, source_object_name, source_object_type, " + f"target_object_id, target_object_name, target_object_type, " + f"created_ts, updated_ts) " + f"VALUES ({tenant_val}'{agent_internal_id}', '{agent_id}', " + f"'{src_id}', '{src_name}', '{src_type}', " + f"'{table_id}', '{new_name}', 'Table', " + f"TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (agent_internal_id, source_object_id, target_object_id) DO UPDATE SET " + f"source_object_name = EXCLUDED.source_object_name, " + f"target_object_name = EXCLUDED.target_object_name, updated_ts = EXCLUDED.updated_ts" + ) + + for column_name in table.get("columns") or []: + clean_col = cls.sanitize(str(column_name).strip()) + if not clean_col: + continue + col_id = str(uuid.uuid4()) + + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.columns ({tenant_col}column_id, name, created_ts, updated_ts) " + f"VALUES ({tenant_val}'{col_id}', '{clean_col}', TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (column_id) DO UPDATE SET updated_ts = EXCLUDED.updated_ts" + ) + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.table_columns " + f"({tenant_col}table_id, table_name, column_name, column_id, created_ts, updated_ts) " + f"VALUES ({tenant_val}'{table_id}', '{new_name}', '{clean_col}', '{col_id}', " + f"TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (tenant_id, table_id, column_name) DO UPDATE SET " + f"column_id = COALESCE(EXCLUDED.column_id, {cls.CORE_DB_NAME}.table_columns.column_id), " + f"updated_ts = EXCLUDED.updated_ts" + ) + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.agent_data_sources " + f"({tenant_col}agent_internal_id, agent_id, " + f"source_object_id, source_object_name, source_object_type, " + f"target_object_id, target_object_name, target_object_type, " + f"created_ts, updated_ts) " + f"VALUES ({tenant_val}'{agent_internal_id}', '{agent_id}', " + f"'{table_id}', '{new_name}', 'Table', " + f"'{col_id}', '{clean_col}', 'Column', " + f"TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (agent_internal_id, source_object_id, target_object_id) DO UPDATE SET " + f"source_object_name = EXCLUDED.source_object_name, " + f"target_object_name = EXCLUDED.target_object_name, updated_ts = EXCLUDED.updated_ts" + ) - # Update core.tables - cls.execute_dml( - f"UPDATE {cls.CORE_DB_NAME}.tables SET name = '{new_name}', updated_ts = TIMESTAMP '{now}' " - f"WHERE table_id = '{table_id}'" - ) - # Propagate new name to all three relationship tables - cls.execute_dml( - f"UPDATE {cls.CORE_DB_NAME}.agent_tables SET table_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " - f"WHERE table_id = '{table_id}' AND agent_id = '{agent_id}'" - ) - cls.execute_dml( - f"UPDATE {cls.CORE_DB_NAME}.tool_tables SET table_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " - f"WHERE table_id = '{table_id}'" - ) - cls.execute_dml( - f"UPDATE {cls.CORE_DB_NAME}.table_columns SET table_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " - f"WHERE table_id = '{table_id}'" - ) - # Propagate new name to agent_data_sources (used by the UI lineage view) - # Table appears as a target in Tool→Table edges - cls.execute_dml( - f"UPDATE {cls.CORE_DB_NAME}.agent_data_sources " - f"SET target_object_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " - f"WHERE agent_internal_id = '{agent_internal_id}' " - f"AND target_object_id = '{table_id}' " - f"AND LOWER(target_object_type) = 'table'" - ) - # Table appears as a source in Table→Column edges - cls.execute_dml( - f"UPDATE {cls.CORE_DB_NAME}.agent_data_sources " - f"SET source_object_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " - f"WHERE agent_internal_id = '{agent_internal_id}' " - f"AND source_object_id = '{table_id}' " - f"AND LOWER(source_object_type) = 'table'" - ) tables_updated += 1 columns_updated = 0 From d947232a9e6bf6a9559c23fe4bf5012a2564bceb Mon Sep 17 00:00:00 2001 From: prasannakumar-tavro Date: Fri, 5 Jun 2026 19:27:07 +0530 Subject: [PATCH 6/7] updated the doc string to support column as optional parameter --- mcp_server/server.py | 16 +++- tavro_library/agent_library.py | 162 +++++++++++++++++++++++---------- 2 files changed, 130 insertions(+), 48 deletions(-) diff --git a/mcp_server/server.py b/mcp_server/server.py index 194f2da..555990c 100644 --- a/mcp_server/server.py +++ b/mcp_server/server.py @@ -376,19 +376,30 @@ async def create_agent( { "name": str, "description": str, - "table": {"name": str, "columns": [str]} + "table": {"name": str} } + Column metadata must be passed through the top-level `columns` parameter, + not nested inside tool table metadata. - `tables`: Optional explicit table metadata for the agent or tools: [ { "name": str, - "columns": [str], "tool_name": str } ] Use `tool_name` when the table belongs to a specific tool. + - `columns`: Optional explicit column metadata for tables: + [ + { + "name": str, + "table_name": str, + "table_id": str + } + ] + Use `table_name` or `table_id` to link each column to its table. + - `data_source`: Optional relationship-style metadata using Agent/Tool/Table/Column source and target object fields. @@ -416,6 +427,7 @@ async def create_agent( their roles generically (e.g. "upstream analytical agents") rather than fabricating names. tools (Optional[List[Dict[str, Any]]]): Optional list of tool definitions. tables (Optional[List[Dict[str, Any]]]): Optional table definitions. + columns (Optional[List[Dict[str, Any]]]): Optional column definitions. data_source (Optional[List[Dict[str, Any]]]): Optional data-source relationships. knowledge_source (Optional[Dict[str, str]]): Optional knowledge source definition. diff --git a/tavro_library/agent_library.py b/tavro_library/agent_library.py index a8282fa..9bcd6c5 100644 --- a/tavro_library/agent_library.py +++ b/tavro_library/agent_library.py @@ -645,12 +645,45 @@ def _table_items(cls, raw_tables: Any) -> List[Dict[str, Any]]: tables.append({ "table_id": cls._clean_text(raw.get("table_id") or raw.get("id") or raw.get("identifier")), "name": cls._clean_text(raw.get("name") or raw.get("table_name")), - "columns": cls._column_names(raw.get("columns") or raw.get("column")), "tool_name": cls._clean_text(raw.get("tool_name") or raw.get("tool")), "tool_id": cls._clean_text(raw.get("tool_id")), }) return tables + @classmethod + def _column_items(cls, raw_columns: Any) -> List[Dict[str, Any]]: + if not raw_columns: + return [] + if isinstance(raw_columns, dict): + raw_columns = [raw_columns] + elif isinstance(raw_columns, str): + raw_columns = [{"name": raw_columns}] + if not isinstance(raw_columns, list): + return [] + + columns: List[Dict[str, Any]] = [] + seen = set() + for raw in raw_columns: + if isinstance(raw, str): + raw = {"name": raw} + if not isinstance(raw, dict): + continue + name = cls._clean_text(raw.get("name") or raw.get("column_name") or raw.get("identifier")) + if not name: + continue + table_id = cls._clean_text(raw.get("table_id")) + table_name = cls._clean_text(raw.get("table_name") or raw.get("table")) + key = (name.lower(), (table_id or "").lower(), (table_name or "").lower()) + if key in seen: + continue + seen.add(key) + columns.append({ + "name": name, + "table_id": table_id, + "table_name": table_name, + }) + return columns + @classmethod def _tables_from_tools(cls, tools: Optional[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: tables: List[Dict[str, Any]] = [] @@ -660,15 +693,6 @@ def _tables_from_tools(cls, tools: Optional[List[Dict[str, Any]]]) -> List[Dict[ tool_name = cls._clean_text(tool.get("name")) tool_tables = cls._table_items(tool.get("tables") or tool.get("table")) - if not tool_tables and tool.get("columns"): - tool_tables = [{ - "table_id": None, - "name": cls._clean_text(tool.get("table_name")) or (f"{tool_name} table" if tool_name else None), - "columns": cls._column_names(tool.get("columns")), - "tool_name": tool_name, - "tool_id": None, - }] - for table in tool_tables: table["tool_name"] = table.get("tool_name") or tool_name tables.append(table) @@ -688,24 +712,41 @@ def _tables_from_data_sources(cls, data_sources: Optional[List[Dict[str, Any]]]) continue item = table_map.setdefault( table_id, - {"table_id": table_id, "name": cls._clean_text(entry.get("source_object_name")), "columns": [], "tool_name": None, "tool_id": None}, + {"table_id": table_id, "name": cls._clean_text(entry.get("source_object_name")), "tool_name": None, "tool_id": None}, ) - column_name = cls._clean_text(entry.get("target_object_name") or entry.get("target_object_id")) - if column_name and column_name not in item["columns"]: - item["columns"].append(column_name) elif src_type == "tool" and tgt_type == "table": table_id = cls._clean_text(entry.get("target_object_id")) if not table_id: continue item = table_map.setdefault( table_id, - {"table_id": table_id, "name": cls._clean_text(entry.get("target_object_name")), "columns": [], "tool_name": None, "tool_id": None}, + {"table_id": table_id, "name": cls._clean_text(entry.get("target_object_name")), "tool_name": None, "tool_id": None}, ) item["tool_id"] = cls._clean_text(entry.get("source_object_id")) item["tool_name"] = cls._clean_text(entry.get("source_object_name")) item["name"] = item.get("name") or cls._clean_text(entry.get("target_object_name")) return list(table_map.values()) + @classmethod + def _columns_from_data_sources(cls, data_sources: Optional[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + columns: List[Dict[str, Any]] = [] + for entry in data_sources or []: + if not isinstance(entry, dict): + continue + src_type = str(entry.get("source_object_type") or "").lower() + tgt_type = str(entry.get("target_object_type") or "").lower() + if src_type != "table" or tgt_type != "column": + continue + column_name = cls._clean_text(entry.get("target_object_name") or entry.get("target_object_id")) + if not column_name: + continue + columns.append({ + "name": column_name, + "table_id": cls._clean_text(entry.get("source_object_id")), + "table_name": cls._clean_text(entry.get("source_object_name")), + }) + return columns + @classmethod def _normalize_tables_payload( cls, @@ -731,26 +772,64 @@ def _normalize_tables_payload( key, { "table_id": raw_table_id, + "source_table_id": raw_table_id, "name": table_name, - "columns": [], "tool_name": table.get("tool_name"), "tool_id": table.get("tool_id"), }, ) item["table_id"] = item.get("table_id") or raw_table_id + item["source_table_id"] = item.get("source_table_id") or raw_table_id item["name"] = table_name or item.get("name") item["tool_name"] = table.get("tool_name") or item.get("tool_name") item["tool_id"] = table.get("tool_id") or item.get("tool_id") - existing_columns = {str(col).strip().lower() for col in item["columns"]} - for column_name in table.get("columns") or []: - column_key = str(column_name).strip().lower() - if column_key and column_key not in existing_columns: - item["columns"].append(column_name) - existing_columns.add(column_key) for item in normalized.values(): item["table_id"] = str(uuid.uuid4()) return list(normalized.values()) + + @classmethod + def _columns_by_table( + cls, + tables_payload: List[Dict[str, Any]], + columns: Any, + data_sources: Optional[List[Dict[str, Any]]], + ) -> Dict[int, List[str]]: + column_entries = [ + *cls._column_items(columns), + *cls._columns_from_data_sources(data_sources), + ] + columns_by_table: Dict[int, List[str]] = {} + + for col_entry in column_entries: + col_name = cls._clean_text(col_entry.get("name")) + if not col_name: + continue + match_id = str(col_entry.get("table_id") or "").strip() + match_name = str(col_entry.get("table_name") or "").strip().lower() + + matched_index: Optional[int] = None + for index, tbl in enumerate(tables_payload): + table_ids = { + str(tbl.get("table_id") or "").strip(), + str(tbl.get("source_table_id") or "").strip(), + } + table_name = str(tbl.get("name") or "").strip().lower() + if (match_id and match_id in table_ids) or (match_name and table_name == match_name): + matched_index = index + break + + if matched_index is None and len(tables_payload) == 1 and not match_id and not match_name: + matched_index = 0 + if matched_index is None: + continue + + existing = {name.strip().lower() for name in columns_by_table.get(matched_index, [])} + col_key = col_name.strip().lower() + if col_key not in existing: + columns_by_table.setdefault(matched_index, []).append(col_name) + + return columns_by_table @staticmethod def _normalize_tenant_id(value: Optional[Any]) -> Optional[str]: @@ -885,6 +964,7 @@ def _write_agent_card( knowledge_source: Optional[Dict[str, str]] = None, tool_ids: Optional[List[str]] = None, tables: Optional[List[Dict[str, Any]]] = None, + columns_by_table: Optional[Dict[int, List[str]]] = None, ) -> None: """Write a full agent card JSON file immediately after creation so get_agent_card returns complete details.""" try: @@ -924,7 +1004,7 @@ def _write_agent_card( "uses_pci": None, }) - for table in tables or []: + for table_index, table in enumerate(tables or []): table_id = table.get("table_id") table_name = table.get("name") if not table_id: @@ -945,7 +1025,7 @@ def _write_agent_card( "uses_phi": None, "uses_pci": None, }) - for column_name in table.get("columns") or []: + for column_name in (columns_by_table or {}).get(table_index, []): col_id = str(uuid.uuid4()) data_source_entries.append({ "relationship_id": None, @@ -1076,22 +1156,7 @@ def create_agent( tool_ids_for_card: List[str] = [] tool_name_to_id: Dict[str, str] = {} tables_payload = cls._normalize_tables_payload(tables, tools, data_source) - - # Merge top-level columns into their matching table entries - for col_entry in (columns or []): - if not isinstance(col_entry, dict): - continue - col_name = str(col_entry.get("name") or "").strip() - if not col_name: - continue - match_id = str(col_entry.get("table_id") or "").strip() - match_name = str(col_entry.get("table_name") or "").strip().lower() - for tbl in tables_payload: - if (match_id and tbl.get("table_id") == match_id) or \ - (match_name and str(tbl.get("name") or "").strip().lower() == match_name): - if col_name not in (tbl.get("columns") or []): - tbl.setdefault("columns", []).append(col_name) - break + columns_by_table = cls._columns_by_table(tables_payload, columns, data_source) # 1. agents table tenant_id_value = f"'{tenant_id}'," if tenant_id else "" @@ -1197,7 +1262,7 @@ def create_agent( """ queries.append(tools_query) - for table in tables_payload: + for table_index, table in enumerate(tables_payload): tool_name_key = str(table.get("tool_name") or "").strip().lower() if tool_name_key and not table.get("tool_id"): table["tool_id"] = tool_name_to_id.get(tool_name_key) @@ -1261,7 +1326,7 @@ def create_agent( ) """) - for column_name in table.get("columns") or []: + for column_name in columns_by_table.get(table_index, []): clean_column = cls.sanitize(column_name) if not clean_column: continue @@ -1421,6 +1486,7 @@ def create_agent( knowledge_source=knowledge_source, tool_ids=tool_ids_for_card, tables=tables_payload, + columns_by_table=columns_by_table, ) payload = { @@ -2243,11 +2309,15 @@ def update_agent( if data_source: extra = cls._normalize_tables_payload(None, None, data_source) tables = list(tables or []) + extra + tables_for_update = [table for table in (tables or []) if isinstance(table, dict)] + columns_for_new_tables = [ + col for col in (columns or []) + if isinstance(col, dict) and not col.get("old_name") + ] + columns_by_table = cls._columns_by_table(tables_for_update, columns_for_new_tables, data_source) tables_updated = 0 - for table in (tables or []): - if not isinstance(table, dict): - continue + for table_index, table in enumerate(tables_for_update): new_name = cls.sanitize(str(table.get("name") or "").strip()) if not new_name: continue @@ -2358,7 +2428,7 @@ def update_agent( f"target_object_name = EXCLUDED.target_object_name, updated_ts = EXCLUDED.updated_ts" ) - for column_name in table.get("columns") or []: + for column_name in columns_by_table.get(table_index, []): clean_col = cls.sanitize(str(column_name).strip()) if not clean_col: continue From c7bd2bc950c8c9ebed5f180170a46c999b07a270 Mon Sep 17 00:00:00 2001 From: prasannakumar-tavro Date: Fri, 5 Jun 2026 20:09:20 +0530 Subject: [PATCH 7/7] agent to table to column issue is fixed --- mcp_server/server.py | 3 +++ tavro_api/api/routers/agents.py | 40 +++++++++++++++++++++++++++++ tavro_app/src/services/mcpClient.ts | 1 + tavro_library/agent_library.py | 27 ++++++++++++++++++- 4 files changed, 70 insertions(+), 1 deletion(-) diff --git a/mcp_server/server.py b/mcp_server/server.py index 555990c..3a0ee11 100644 --- a/mcp_server/server.py +++ b/mcp_server/server.py @@ -389,6 +389,9 @@ async def create_agent( } ] Use `tool_name` when the table belongs to a specific tool. + Omit `tool_name` for direct agent-owned tables. Direct tables are represented + as Agent -> Table -> Column; tool-owned tables are represented as + Agent -> Tool -> Table -> Column. - `columns`: Optional explicit column metadata for tables: [ diff --git a/tavro_api/api/routers/agents.py b/tavro_api/api/routers/agents.py index 4f5a407..ab881fb 100644 --- a/tavro_api/api/routers/agents.py +++ b/tavro_api/api/routers/agents.py @@ -289,6 +289,21 @@ def _tables_from_data_sources(data_sources: Optional[List[Dict[str, Any]]]) -> L column_name = _clean_text(entry.get("target_object_name") or entry.get("target_object_id")) if column_name and column_name not in item["columns"]: item["columns"].append(column_name) + elif src_type == "agent" and tgt_type == "table": + table_id = _clean_text(entry.get("target_object_id")) + if not table_id: + continue + item = table_map.setdefault( + table_id, + { + "table_id": table_id, + "name": _clean_text(entry.get("target_object_name")), + "columns": [], + "tool_name": None, + "tool_id": None, + }, + ) + item["name"] = item.get("name") or _clean_text(entry.get("target_object_name")) elif src_type == "tool" and tgt_type == "table": table_id = _clean_text(entry.get("target_object_id")) if not table_id: @@ -684,6 +699,31 @@ async def create_agent( "table_name": table_name, }, ) + else: + await db.execute( + text(f""" + INSERT INTO {CORE}.agent_data_sources ( + tenant_id, agent_internal_id, agent_id, + created_ts, updated_ts, + source_object_id, source_object_name, source_object_type, + target_object_id, target_object_name, target_object_type + ) + VALUES ( + :tid, :iid, :aid, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, + :aid, :agent_name, 'Agent', + :table_id, :table_name, 'Table' + ) + """), + { + "tid": tenant_id, + "iid": agent_internal_id, + "aid": agent_id, + "agent_name": body.agent_name, + "table_id": table_id, + "table_name": table_name, + }, + ) for column_name in table.get("columns") or []: column_id = str(uuid.uuid5(uuid.NAMESPACE_OID, f"{table_id}:{column_name}")) diff --git a/tavro_app/src/services/mcpClient.ts b/tavro_app/src/services/mcpClient.ts index 35ae0c6..98c8b7a 100644 --- a/tavro_app/src/services/mcpClient.ts +++ b/tavro_app/src/services/mcpClient.ts @@ -1185,6 +1185,7 @@ Every generated value must be coherent with the blueprint. Do not fabricate data instruction, ...(args?.tools ? { tools: args.tools } : {}), ...(args?.tables ? { tables: args.tables } : {}), + ...(args?.columns ? { columns: args.columns } : {}), ...(args?.data_source ? { data_source: args.data_source } : {}), ...(args?.knowledge_source ? { knowledge_source: args.knowledge_source } : {}), ...(args?.original_prompt ? { original_prompt: args.original_prompt } : {}), diff --git a/tavro_library/agent_library.py b/tavro_library/agent_library.py index 9bcd6c5..8d72f2a 100644 --- a/tavro_library/agent_library.py +++ b/tavro_library/agent_library.py @@ -714,6 +714,15 @@ def _tables_from_data_sources(cls, data_sources: Optional[List[Dict[str, Any]]]) table_id, {"table_id": table_id, "name": cls._clean_text(entry.get("source_object_name")), "tool_name": None, "tool_id": None}, ) + elif src_type == "agent" and tgt_type == "table": + table_id = cls._clean_text(entry.get("target_object_id")) + if not table_id: + continue + item = table_map.setdefault( + table_id, + {"table_id": table_id, "name": cls._clean_text(entry.get("target_object_name")), "tool_name": None, "tool_id": None}, + ) + item["name"] = item.get("name") or cls._clean_text(entry.get("target_object_name")) elif src_type == "tool" and tgt_type == "table": table_id = cls._clean_text(entry.get("target_object_id")) if not table_id: @@ -1325,6 +1334,22 @@ def create_agent( TIMESTAMP '{now}' ) """) + else: + data_source_values.append(f""" + ( + {tenant_id_value} + '{agent_internal_id}', + '{agent_id}', + TIMESTAMP '{now}', + TIMESTAMP '{now}', + '{agent_id}', + '{agent_name}', + 'Agent', + '{table_id}', + '{table_name}', + 'Table' + ) + """) for column_name in columns_by_table.get(table_index, []): clean_column = cls.sanitize(column_name) @@ -1451,7 +1476,7 @@ def create_agent( ) """) - # 5. data source insert (only if tools exist) + # 5. data source insert if data_source_values: queries.append(f""" INSERT INTO {cls.CORE_DB_NAME}.agent_data_sources (