diff --git a/mcp_server/server.py b/mcp_server/server.py index 05fd11c..3a0ee11 100644 --- a/mcp_server/server.py +++ b/mcp_server/server.py @@ -343,7 +343,18 @@ async def get_agent_catalog(original_prompt: str, start_record: int = 1, record_ return {"error": "INTERNAL_ERROR", "details": str(e)} @core.tool(name="create_agent") -async def create_agent(original_prompt: str, *, agent_name: str, description: str, instruction: str, tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None) -> Dict[str, Any]: +async def create_agent( + original_prompt: str, + *, + agent_name: str, + description: str, + instruction: str, + tools: Optional[List[Dict[str, Any]]] = None, + tables: Optional[List[Dict[str, Any]]] = None, + columns: Optional[List[Dict[str, Any]]] = None, + data_source: Optional[List[Dict[str, Any]]] = None, + knowledge_source: Optional[Dict[str, str]] = None, +) -> Dict[str, Any]: """ Create and register a new AI agent with defined identity, behavior, and optional integrations. @@ -361,6 +372,39 @@ async def create_agent(original_prompt: str, *, agent_name: str, description: st "name": str, "description": str } + A tool may optionally include table metadata it uses: + { + "name": str, + "description": str, + "table": {"name": str} + } + Column metadata must be passed through the top-level `columns` parameter, + not nested inside tool table metadata. + + - `tables`: Optional explicit table metadata for the agent or tools: + [ + { + "name": str, + "tool_name": str + } + ] + Use `tool_name` when the table belongs to a specific tool. + Omit `tool_name` for direct agent-owned tables. Direct tables are represented + as Agent -> Table -> Column; tool-owned tables are represented as + Agent -> Tool -> Table -> Column. + + - `columns`: Optional explicit column metadata for tables: + [ + { + "name": str, + "table_name": str, + "table_id": str + } + ] + Use `table_name` or `table_id` to link each column to its table. + + - `data_source`: Optional relationship-style metadata using Agent/Tool/Table/Column + source and target object fields. - `knowledge_source`: A reference to an external knowledge source that the agent can use for contextual understanding. If provided, it must follow: @@ -384,7 +428,10 @@ async def create_agent(original_prompt: str, *, agent_name: str, description: st "Revenue Agent") unless the user has explicitly named them or they are confirmed to exist in the catalog context. If the agent coordinates with upstream agents, describe their roles generically (e.g. "upstream analytical agents") rather than fabricating names. - tools (Optional[List[Dict[str, str]]]): Optional list of tool definitions. + tools (Optional[List[Dict[str, Any]]]): Optional list of tool definitions. + tables (Optional[List[Dict[str, Any]]]): Optional table definitions. + columns (Optional[List[Dict[str, Any]]]): Optional column definitions. + data_source (Optional[List[Dict[str, Any]]]): Optional data-source relationships. knowledge_source (Optional[Dict[str, str]]): Optional knowledge source definition. Returns: @@ -403,6 +450,9 @@ async def create_agent(original_prompt: str, *, agent_name: str, description: st "description": description, "instruction": instruction, "tools": tools, + "tables": tables, + "columns": columns, + "data_source": data_source, "knowledge_source": knowledge_source, }, tenant_id, @@ -413,6 +463,9 @@ async def create_agent(original_prompt: str, *, agent_name: str, description: st description=description, instruction=instruction, tools=tools, + tables=tables, + columns=columns, + data_source=data_source, knowledge_source=knowledge_source, tenant_id=tenant_id, ) @@ -719,12 +772,12 @@ async def remove_ai_use_case_agent_relationship(original_prompt: str, *, agent_c return {"error": "INTERNAL_ERROR", "details": str(e)} @core.tool(name="update_agent") -async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, agent_name: Optional[str] = None, description: Optional[str] = None, instruction: Optional[str] = None, tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None) -> Dict[str, Any]: +async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, agent_name: Optional[str] = None, description: Optional[str] = None, instruction: Optional[str] = None, tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None, tables: Optional[List[Dict[str, Any]]] = None, columns: Optional[List[Dict[str, Any]]] = None, data_source: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: """ Update an existing AI agent’s configuration. Allows modification of agent metadata such as name, description, - behavior instructions, tools, and knowledge sources. + behavior instructions, tools, knowledge sources, tables, and columns. Args: original_prompt (str): REQUIRED. Exact user message verbatim. @@ -736,6 +789,24 @@ async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, confirmed to exist. Describe inter-agent dependencies generically if unknown. tools (Optional[List[Dict[str, str]]]): Updated tool list. knowledge_source (Optional[Dict[str, str]]): Updated knowledge source. + tables (Optional[List[Dict[str, Any]]]): Tables to rename or update. Each entry must include + the new name and a way to identify the existing table: + { + "name": str, # new table name to set + "old_name": str, # current table name (use when table_id is unknown) + "table_id": str # table identifier (preferred when available) + } + Use "old_name" when the user refers to the current name (e.g. "rename + SNOW_incident to Incidents"). Use "table_id" when you already know it. + columns (Optional[List[Dict[str, Any]]]): Columns to rename. Each entry must include + the new name and a way to identify the existing column: + { + "name": str, # new column name to set + "old_name": str, # current column name (required) + "table_id": str # table the column belongs to (preferred for precision) + } + Use "old_name" for the current column name (e.g. "rename col_id to incident_id"). + Provide "table_id" when the same column name exists in multiple tables. Returns: Dict[str, Any]: Updated agent metadata or error response. @@ -756,6 +827,9 @@ async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, "instruction": instruction, "tools": tools, "knowledge_source": knowledge_source, + "tables": tables, + "columns": columns, + "data_source": data_source, }, tenant_id, ) @@ -767,6 +841,9 @@ async def update_agent(original_prompt: str, *, agent_id: Optional[str] = None, instruction=instruction, tools=tools, knowledge_source=knowledge_source, + tables=tables, + columns=columns, + data_source=data_source, tenant_id=str(tenant_id), ) diff --git a/services/upload_processor.py b/services/upload_processor.py index ee2327a..ae52994 100644 --- a/services/upload_processor.py +++ b/services/upload_processor.py @@ -277,7 +277,13 @@ def _upsert_agent_tools(conn, card: dict, agent_internal_id: str, now_str: str): tools = card.get("tool", []) or [] if not has_meaningful_data(tools): return + tenant_id = ident.get("tenant_id") agent_id = ident.get("agent_id") + # If tenant_id wasn't provided on the incoming card, try to read it from the agents table + if tenant_id is None: + rows = _query(conn, f"SELECT tenant_id FROM {CORE}.agents WHERE agent_internal_id = {_sq(agent_internal_id)} LIMIT 1") + if rows and rows[0].get("tenant_id"): + tenant_id = rows[0].get("tenant_id") select_rows = [] for tool in tools: delegation_possible = ( @@ -285,7 +291,7 @@ def _upsert_agent_tools(conn, card: dict, agent_internal_id: str, now_str: str): if tool.get("delegation_possible") is not None else None ) select_rows.append(f""" - SELECT {_sq(agent_internal_id)} AS agent_internal_id, {_sq(agent_id)} AS agent_id, + SELECT {_sq(tenant_id)} AS tenant_id, {_sq(agent_internal_id)} AS agent_internal_id, {_sq(agent_id)} AS agent_id, {_sq(tool.get('identifier'))} AS tool_id, {_sq(tool.get('name'))} AS tool_name, {_sq(tool.get('description'))} AS tool_description, {_bool(delegation_possible)}::boolean AS delegation_possible, @@ -298,18 +304,19 @@ def _upsert_agent_tools(conn, card: dict, agent_internal_id: str, now_str: str): union_all = "\nUNION ALL\n".join(select_rows) _exec(conn, f""" INSERT INTO {CORE}.agent_tools ( - agent_internal_id, agent_id, tool_id, tool_name, tool_description, + tenant_id, agent_internal_id, agent_id, tool_id, tool_name, tool_description, delegation_possible, allowed_delegates, input_schema_json_text, output_schema_json_text, default_config_json_text, created_ts, updated_ts ) - SELECT agent_internal_id, agent_id, tool_id, tool_name, tool_description, + SELECT tenant_id, agent_internal_id, agent_id, tool_id, tool_name, tool_description, delegation_possible, allowed_delegates, input_schema_json_text, output_schema_json_text, default_config_json_text, now_ts, now_ts FROM ({union_all}) AS s ON CONFLICT (agent_internal_id, tool_id) DO UPDATE SET + tenant_id = EXCLUDED.tenant_id, agent_id = EXCLUDED.agent_id, tool_description = EXCLUDED.tool_description, delegation_possible = EXCLUDED.delegation_possible, @@ -321,6 +328,184 @@ def _upsert_agent_tools(conn, card: dict, agent_internal_id: str, now_str: str): """, f"agent_tools upsert ({len(tools)} tools)") +# --------------------------------------------------------------------------- +# Step X — core.tables (table catalog) +# --------------------------------------------------------------------------- +def _upsert_tables(conn, card: dict, agent_internal_id: str, now_str: str): + ident = card.get("identification", {}) + tenant_id = ident.get("tenant_id") + agent_id = ident.get("agent_id") + data_sources = card.get("data_source", []) or [] + if not has_meaningful_data(data_sources): + return + # If tenant_id wasn't provided on the incoming card, try to read it from the agents table + if tenant_id is None: + rows = _query(conn, f"SELECT tenant_id FROM {CORE}.agents WHERE agent_internal_id = {_sq(agent_internal_id)} LIMIT 1") + if rows and rows[0].get("tenant_id"): + tenant_id = rows[0].get("tenant_id") + + # Collect table records; tool_id is used only to populate tool_tables below. + tables = {} + # First pass: find explicit table nodes (as source or target) + # Also capture direct agent->table ownership when present. + for ds in data_sources: + src_type = ds.get("source_object_type") + tgt_type = ds.get("target_object_type") + # If source is Table, register it + if str(src_type).lower() == "table": + tid = ds.get("source_object_id") + name = ds.get("source_object_name") + if tid: + tables[tid] = tables.get(tid, {}) + if name: + tables[tid]["name"] = name + # If target is Table, register it + if str(tgt_type).lower() == "table": + tid = ds.get("target_object_id") + name = ds.get("target_object_name") + if tid: + tables[tid] = tables.get(tid, {}) + if name: + tables[tid]["name"] = name + # Direct agent -> table relationship should be stored on the table + if str(src_type).lower() == "agent": + tables[tid]["agent_id"] = ds.get("source_object_id") + + # Second pass: if a Tool -> Table relation exists, capture tool_id/tool_name and drop agent_id + for ds in data_sources: + src_type = ds.get("source_object_type") + tgt_type = ds.get("target_object_type") + if str(src_type).lower() == "tool" and str(tgt_type).lower() == "table": + tool_id = ds.get("source_object_id") + table_id = ds.get("target_object_id") + if table_id and tool_id and table_id in tables: + tables[table_id]["tool_id"] = tool_id + tables[table_id]["tool_name"] = ds.get("source_object_name") or "" + tables[table_id].pop("agent_id", None) + + if not tables: + return + + agent_name = card.get("name") or "" + select_rows = [] + agent_table_rows = [] + tool_table_rows = [] + for tid, meta in tables.items(): + name = meta.get("name") + tool_id = meta.get("tool_id") + tool_name = meta.get("tool_name") or "" + select_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(tid)} AS table_id, {_sq(name)} AS name, TIMESTAMP '{now_str}' AS now_ts") + agent_table_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(agent_id)} AS agent_id, {_sq(agent_name)} AS agent_name, {_sq(agent_internal_id)} AS agent_internal_id, {_sq(tid)} AS table_id, {_sq(name)} AS table_name, TIMESTAMP '{now_str}' AS now_ts") + if tool_id: + tool_table_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(tool_id)} AS tool_id, {_sq(tool_name)} AS tool_name, {_sq(tid)} AS table_id, {_sq(name)} AS table_name, TIMESTAMP '{now_str}' AS now_ts") + + union_all = "\nUNION ALL\n".join(select_rows) + _exec(conn, f""" + INSERT INTO {CORE}.tables ( + tenant_id, table_id, name, created_ts, updated_ts + ) + SELECT tenant_id, table_id, name, now_ts, now_ts + FROM ({union_all}) AS s + ON CONFLICT (table_id) + DO UPDATE SET + name = COALESCE(EXCLUDED.name, {CORE}.tables.name), + updated_ts = EXCLUDED.updated_ts + """, f"tables upsert ({len(select_rows)})") + + if agent_table_rows: + ua = "\nUNION ALL\n".join(agent_table_rows) + _exec(conn, f""" + INSERT INTO {CORE}.agent_tables ( + tenant_id, agent_id, agent_name, agent_internal_id, + table_id, table_name, created_ts, updated_ts + ) + SELECT tenant_id, agent_id, agent_name, agent_internal_id, + table_id, table_name, now_ts, now_ts + FROM ({ua}) AS s + WHERE agent_id IS NOT NULL AND agent_id <> '' + ON CONFLICT (tenant_id, agent_id, table_id) DO UPDATE SET + agent_name = EXCLUDED.agent_name, + agent_internal_id = EXCLUDED.agent_internal_id, + table_name = COALESCE(EXCLUDED.table_name, {CORE}.agent_tables.table_name), + updated_ts = EXCLUDED.updated_ts + """, f"agent_tables upsert ({len(agent_table_rows)})") + + if tool_table_rows: + ut = "\nUNION ALL\n".join(tool_table_rows) + _exec(conn, f""" + INSERT INTO {CORE}.tool_tables ( + tenant_id, tool_id, tool_name, table_id, table_name, + created_ts, updated_ts + ) + SELECT tenant_id, tool_id, tool_name, table_id, table_name, + now_ts, now_ts + FROM ({ut}) AS s + WHERE tool_id IS NOT NULL AND tool_id <> '' + ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET + tool_name = COALESCE(EXCLUDED.tool_name, {CORE}.tool_tables.tool_name), + table_name = COALESCE(EXCLUDED.table_name, {CORE}.tool_tables.table_name), + updated_ts = EXCLUDED.updated_ts + """, f"tool_tables upsert ({len(tool_table_rows)})") + + +# --------------------------------------------------------------------------- +# Step Y — core.columns (tables -> columns) +# --------------------------------------------------------------------------- +def _upsert_columns(conn, card: dict, agent_internal_id: str, now_str: str): + ident = card.get("identification", {}) + tenant_id = ident.get("tenant_id") + data_sources = card.get("data_source", []) or [] + if not has_meaningful_data(data_sources): + return + # If tenant_id wasn't provided on the incoming card, try to read it from the agents table + if tenant_id is None: + rows = _query(conn, f"SELECT tenant_id FROM {CORE}.agents WHERE agent_internal_id = {_sq(agent_internal_id)} LIMIT 1") + if rows and rows[0].get("tenant_id"): + tenant_id = rows[0].get("tenant_id") + + # Columns are represented as relationships where source is Table and target is Column + select_rows = [] + tc_rows = [] + for ds in data_sources: + if str(ds.get("source_object_type")).lower() == "table" and str(ds.get("target_object_type")).lower() == "column": + table_id = ds.get("source_object_id") + table_name = ds.get("source_object_name") or "" + col_name = ds.get("target_object_name") + column_id = ds.get("target_object_id") or col_name + if table_id and col_name and column_id: + select_rows.append(f"SELECT {_sq(column_id)} AS column_id, {_sq(tenant_id)} AS tenant_id, {_sq(col_name)} AS name, TIMESTAMP '{now_str}' AS now_ts") + tc_rows.append(f"SELECT {_sq(tenant_id)} AS tenant_id, {_sq(table_id)} AS table_id, {_sq(table_name)} AS table_name, {_sq(col_name)} AS column_name, {_sq(column_id)} AS column_id, TIMESTAMP '{now_str}' AS now_ts") + + if not select_rows: + return + + union_all = "\nUNION ALL\n".join(select_rows) + _exec(conn, f""" + INSERT INTO {CORE}.columns ( + column_id, tenant_id, name, created_ts, updated_ts + ) + SELECT column_id, tenant_id, name, now_ts, now_ts + FROM ({union_all}) AS s + ON CONFLICT (column_id) + DO UPDATE SET + updated_ts = EXCLUDED.updated_ts + """, f"columns upsert ({len(select_rows)})") + + if tc_rows: + utc = "\nUNION ALL\n".join(tc_rows) + _exec(conn, f""" + INSERT INTO {CORE}.table_columns ( + tenant_id, table_id, table_name, column_name, column_id, created_ts, updated_ts + ) + SELECT tenant_id, table_id, table_name, column_name, column_id, now_ts, now_ts + FROM ({utc}) AS s + ON CONFLICT (tenant_id, table_id, column_name) DO UPDATE SET + table_name = COALESCE(EXCLUDED.table_name, {CORE}.table_columns.table_name), + column_id = COALESCE(EXCLUDED.column_id, {CORE}.table_columns.column_id), + updated_ts = EXCLUDED.updated_ts + """, f"table_columns upsert ({len(tc_rows)})") + + # --------------------------------------------------------------------------- # Step 5 — core.agent_controls # --------------------------------------------------------------------------- @@ -1097,11 +1282,17 @@ def process_card_for_upload(card_dict: dict, tenant_id: Optional[str] = None) -> with _db() as conn: print("[INFO] Step 1/20 - agents") agent_internal_id = _upsert_agent(conn, card_dict, now_str, incoming_source_hash, tenant_id) + # Propagate tenant_id into the card's identification so downstream upserts can read it + if tenant_id is not None: + ident = card_dict.setdefault("identification", {}) + ident["tenant_id"] = tenant_id steps = [ (" 2/20 - agent_configurations", _upsert_agent_configuration), (" 3/20 - agent_identifications", _upsert_agent_identification), (" 4/20 - agent_tools", _upsert_agent_tools), + (" 4.1/20 - tables", _upsert_tables), + (" 4.2/20 - columns", _upsert_columns), (" 5/20 - agent_controls", _upsert_agent_controls), (" 6/20 - agent_knowledge_sources", _upsert_agent_knowledge_source), (" 7/20 - agent_llm_models", _upsert_agent_llm_models), diff --git a/sql/core/agent_tables.sql b/sql/core/agent_tables.sql new file mode 100644 index 0000000..3af0991 --- /dev/null +++ b/sql/core/agent_tables.sql @@ -0,0 +1,10 @@ +CREATE TABLE IF NOT EXISTS core.agent_tables ( + tenant_id TEXT, + agent_id TEXT, + agent_name TEXT, + agent_internal_id TEXT, + table_id TEXT, + table_name TEXT, + created_ts TIMESTAMP, + updated_ts TIMESTAMP +); diff --git a/sql/core/columns.sql b/sql/core/columns.sql new file mode 100644 index 0000000..1f8cb21 --- /dev/null +++ b/sql/core/columns.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS core.columns ( + column_id TEXT PRIMARY KEY, + tenant_id TEXT, + name TEXT, + created_ts TIMESTAMP, + updated_ts TIMESTAMP +); diff --git a/sql/core/table_columns.sql b/sql/core/table_columns.sql new file mode 100644 index 0000000..85c0cf2 --- /dev/null +++ b/sql/core/table_columns.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS core.table_columns ( + tenant_id TEXT, + table_id TEXT, + table_name TEXT, + column_name TEXT, + column_id TEXT, + created_ts TIMESTAMP, + updated_ts TIMESTAMP +); diff --git a/sql/core/tables.sql b/sql/core/tables.sql new file mode 100644 index 0000000..99e2536 --- /dev/null +++ b/sql/core/tables.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS core.tables ( + tenant_id TEXT, + table_id TEXT PRIMARY KEY, + name TEXT, + country_of_provenance TEXT, + created_ts TIMESTAMP, + updated_ts TIMESTAMP +); diff --git a/sql/core/tool_tables.sql b/sql/core/tool_tables.sql new file mode 100644 index 0000000..578f0d1 --- /dev/null +++ b/sql/core/tool_tables.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS core.tool_tables ( + tenant_id TEXT, + tool_id TEXT, + tool_name TEXT, + table_id TEXT, + table_name TEXT, + created_ts TIMESTAMP, + updated_ts TIMESTAMP +); diff --git a/sql/core/zz_agent_upsert_unique_indexes.sql b/sql/core/zz_agent_upsert_unique_indexes.sql index 73d6aa6..9c9c4cb 100644 --- a/sql/core/zz_agent_upsert_unique_indexes.sql +++ b/sql/core/zz_agent_upsert_unique_indexes.sql @@ -2,6 +2,9 @@ CREATE UNIQUE INDEX IF NOT EXISTS ux_core_agents_current ON core.agents (agent_id, agent_name) WHERE is_current = true; +CREATE UNIQUE INDEX IF NOT EXISTS ux_core_agents_internal_id +ON core.agents (agent_internal_id); + CREATE UNIQUE INDEX IF NOT EXISTS ux_core_agent_configurations_current ON core.agent_configurations (agent_internal_id) WHERE is_current = true; @@ -58,8 +61,121 @@ ON core.business_applications (business_application_id); CREATE UNIQUE INDEX IF NOT EXISTS ux_core_business_processes ON core.business_processes (business_process_id); +-- ux_core_columns removed: column_id is now the PRIMARY KEY + +-- ux_core_tables removed: table_id is already the PRIMARY KEY + +CREATE UNIQUE INDEX IF NOT EXISTS ux_core_tool_tables +ON core.tool_tables (tenant_id, tool_id, table_id); + +CREATE UNIQUE INDEX IF NOT EXISTS ux_core_agent_tables +ON core.agent_tables (tenant_id, agent_id, table_id); + +CREATE UNIQUE INDEX IF NOT EXISTS ux_core_table_columns +ON core.table_columns (tenant_id, table_id, column_name); + DO $$ BEGIN + IF to_regclass('core.tables') IS NOT NULL THEN + ALTER TABLE core.tables DROP CONSTRAINT IF EXISTS fk_core_tables_agent_tool; + DROP INDEX IF EXISTS core.ix_core_tables_agent_tool; + + IF to_regclass('core.agent_tables') IS NOT NULL + AND EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tables' AND column_name = 'agent_id' + ) + THEN + EXECUTE ' + INSERT INTO core.agent_tables ( + tenant_id, agent_id, agent_name, agent_internal_id, + table_id, table_name, created_ts, updated_ts + ) + SELECT + t.tenant_id, + t.agent_id, + ag.agent_name, + t.agent_internal_id, + t.table_id, + t.name, + COALESCE(t.created_ts, CURRENT_TIMESTAMP), + CURRENT_TIMESTAMP + FROM core.tables t + LEFT JOIN core.agents ag + ON ag.agent_id = t.agent_id + AND ag.agent_internal_id = t.agent_internal_id + WHERE t.agent_id IS NOT NULL + AND t.agent_id <> '''' + ON CONFLICT (tenant_id, agent_id, table_id) DO UPDATE SET + agent_name = COALESCE(EXCLUDED.agent_name, core.agent_tables.agent_name), + agent_internal_id = EXCLUDED.agent_internal_id, + table_name = COALESCE(EXCLUDED.table_name, core.agent_tables.table_name), + updated_ts = EXCLUDED.updated_ts + '; + END IF; + + IF to_regclass('core.tool_tables') IS NOT NULL + AND EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tables' AND column_name = 'tool_id' + ) + THEN + EXECUTE ' + INSERT INTO core.tool_tables ( + tenant_id, tool_id, tool_name, table_id, table_name, + created_ts, updated_ts + ) + SELECT + t.tenant_id, + t.tool_id, + at.tool_name, + t.table_id, + t.name, + COALESCE(t.created_ts, CURRENT_TIMESTAMP), + CURRENT_TIMESTAMP + FROM core.tables t + LEFT JOIN core.agent_tools at + ON at.tool_id = t.tool_id + AND at.agent_internal_id = t.agent_internal_id + WHERE t.tool_id IS NOT NULL + AND t.tool_id <> '''' + ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET + tool_name = COALESCE(EXCLUDED.tool_name, core.tool_tables.tool_name), + table_name = COALESCE(EXCLUDED.table_name, core.tool_tables.table_name), + updated_ts = EXCLUDED.updated_ts + '; + END IF; + + IF to_regclass('core.tool_tables') IS NOT NULL THEN + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tool_tables' AND column_name = 'agent_id' + ) THEN + ALTER TABLE core.tool_tables DROP COLUMN agent_id; + END IF; + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tool_tables' AND column_name = 'agent_internal_id' + ) THEN + ALTER TABLE core.tool_tables DROP COLUMN agent_internal_id; + END IF; + END IF; + + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tables' AND column_name = 'agent_id' + ) THEN + ALTER TABLE core.tables DROP COLUMN agent_id; + END IF; + + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tables' AND column_name = 'tool_id' + ) THEN + ALTER TABLE core.tables DROP COLUMN tool_id; + END IF; + END IF; + IF to_regclass('core.agent_ai_use_cases') IS NOT NULL THEN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns @@ -208,6 +324,40 @@ BEGIN ON DELETE CASCADE; END IF; + -- Create helper function + triggers to populate tenant_id on insert when missing + IF to_regclass('core.agents') IS NOT NULL THEN + EXECUTE $ddl$ + CREATE OR REPLACE FUNCTION core.populate_tenant_from_agent() RETURNS trigger AS $func$ + BEGIN + -- For rows that include agent_internal_id, prefer that lookup + IF NEW.tenant_id IS NULL OR NEW.tenant_id = '' THEN + IF TG_TABLE_NAME = 'agent_tools' THEN + IF NEW.agent_internal_id IS NOT NULL THEN + SELECT tenant_id INTO NEW.tenant_id FROM core.agents WHERE agent_internal_id = NEW.agent_internal_id LIMIT 1; + END IF; + END IF; + END IF; + RETURN NEW; + END; + $func$ LANGUAGE plpgsql; + $ddl$; + + -- Attach triggers to relevant tables + IF to_regclass('core.agent_tools') IS NOT NULL THEN + EXECUTE 'DROP TRIGGER IF EXISTS trg_populate_tenant_agent_tools ON core.agent_tools'; + EXECUTE 'CREATE TRIGGER trg_populate_tenant_agent_tools BEFORE INSERT OR UPDATE ON core.agent_tools FOR EACH ROW EXECUTE FUNCTION core.populate_tenant_from_agent()'; + END IF; + + IF to_regclass('core.tables') IS NOT NULL THEN + EXECUTE 'DROP TRIGGER IF EXISTS trg_populate_tenant_tables ON core.tables'; + EXECUTE 'CREATE TRIGGER trg_populate_tenant_tables BEFORE INSERT OR UPDATE ON core.tables FOR EACH ROW EXECUTE FUNCTION core.populate_tenant_from_agent()'; + END IF; + + IF to_regclass('core.columns') IS NOT NULL THEN + EXECUTE 'DROP TRIGGER IF EXISTS trg_populate_tenant_columns ON core.columns'; + END IF; + END IF; + IF NOT EXISTS ( SELECT 1 FROM pg_constraint @@ -251,4 +401,41 @@ BEGIN END IF; END IF; + IF to_regclass('core.table_columns') IS NOT NULL + AND NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'table_columns' AND column_name = 'column_id' + ) + THEN + ALTER TABLE core.table_columns ADD COLUMN column_id TEXT; + END IF; + + IF to_regclass('core.columns') IS NOT NULL THEN + ALTER TABLE core.columns DROP CONSTRAINT IF EXISTS fk_core_columns_table; + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'columns' AND column_name = 'column_id' + ) THEN + ALTER TABLE core.columns ADD COLUMN column_id TEXT; + UPDATE core.columns SET column_id = gen_random_uuid()::text WHERE column_id IS NULL; + ALTER TABLE core.columns ADD PRIMARY KEY (column_id); + END IF; + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'columns' AND column_name = 'table_id' + ) THEN + ALTER TABLE core.columns DROP COLUMN table_id; + END IF; + END IF; + + IF to_regclass('core.tables') IS NOT NULL THEN + ALTER TABLE core.tables DROP CONSTRAINT IF EXISTS fk_core_tables_agent; + IF EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'core' AND table_name = 'tables' AND column_name = 'agent_internal_id' + ) THEN + ALTER TABLE core.tables DROP COLUMN agent_internal_id; + END IF; + END IF; + END $$; diff --git a/tavro_api/api/routers/agents.py b/tavro_api/api/routers/agents.py index 6f4f956..ab881fb 100644 --- a/tavro_api/api/routers/agents.py +++ b/tavro_api/api/routers/agents.py @@ -98,7 +98,9 @@ class AgentCreateRequest(BaseModel): role: Optional[str] = None environment: Optional[str] = None owner: Optional[str] = None - tools: Optional[List[Dict[str, str]]] = None + tools: Optional[List[Dict[str, Any]]] = None + tables: Optional[List[Dict[str, Any]]] = None + data_source: Optional[List[Dict[str, Any]]] = None knowledge_source: Optional[Dict[str, str]] = None @@ -184,14 +186,198 @@ def _agent_card_dir() -> Path: return Path(os.getenv("LOCAL_AGENT_CARD_DIR", "./agent_cards")) +def _clean_text(value: Any) -> Optional[str]: + if value is None: + return None + text_value = str(value).strip() + return text_value or None + + +def _column_names(raw_columns: Any) -> List[str]: + if not raw_columns: + return [] + if isinstance(raw_columns, str): + raw_columns = [raw_columns] + if not isinstance(raw_columns, list): + return [] + + names: List[str] = [] + seen: set[str] = set() + for col in raw_columns: + if isinstance(col, dict): + name = _clean_text(col.get("name") or col.get("column_name") or col.get("identifier")) + else: + name = _clean_text(col) + if name and name.lower() not in seen: + seen.add(name.lower()) + names.append(name) + return names + + +def _table_items(raw_tables: Any) -> List[Dict[str, Any]]: + if not raw_tables: + return [] + if isinstance(raw_tables, dict): + raw_tables = [raw_tables] + elif isinstance(raw_tables, str): + raw_tables = [{"name": raw_tables}] + if not isinstance(raw_tables, list): + return [] + + tables: List[Dict[str, Any]] = [] + for raw in raw_tables: + if isinstance(raw, str): + raw = {"name": raw} + if not isinstance(raw, dict): + continue + tables.append({ + "table_id": _clean_text(raw.get("table_id") or raw.get("id") or raw.get("identifier")), + "name": _clean_text(raw.get("name") or raw.get("table_name")), + "columns": _column_names(raw.get("columns") or raw.get("column")), + "tool_name": _clean_text(raw.get("tool_name") or raw.get("tool")), + "tool_id": _clean_text(raw.get("tool_id")), + }) + return tables + + +def _tables_from_tools(tools: Optional[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + tables: List[Dict[str, Any]] = [] + for tool in tools or []: + if not isinstance(tool, dict): + continue + tool_name = _clean_text(tool.get("name")) + tool_tables = _table_items(tool.get("tables") or tool.get("table")) + + # Also support the compact shape: + # { "name": "create_incident", "columns": ["id", "status"] } + if not tool_tables and tool.get("columns"): + tool_tables = [{ + "table_id": None, + "name": _clean_text(tool.get("table_name")) or (f"{tool_name} table" if tool_name else None), + "columns": _column_names(tool.get("columns")), + "tool_name": tool_name, + "tool_id": None, + }] + + for table in tool_tables: + table["tool_name"] = table.get("tool_name") or tool_name + tables.append(table) + return tables + + +def _tables_from_data_sources(data_sources: Optional[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + table_map: Dict[str, Dict[str, Any]] = {} + for entry in data_sources or []: + if not isinstance(entry, dict): + continue + src_type = str(entry.get("source_object_type") or "").lower() + tgt_type = str(entry.get("target_object_type") or "").lower() + if src_type == "table" and tgt_type == "column": + table_id = _clean_text(entry.get("source_object_id")) + if not table_id: + continue + item = table_map.setdefault( + table_id, + { + "table_id": table_id, + "name": _clean_text(entry.get("source_object_name")), + "columns": [], + "tool_name": None, + "tool_id": None, + }, + ) + column_name = _clean_text(entry.get("target_object_name") or entry.get("target_object_id")) + if column_name and column_name not in item["columns"]: + item["columns"].append(column_name) + elif src_type == "agent" and tgt_type == "table": + table_id = _clean_text(entry.get("target_object_id")) + if not table_id: + continue + item = table_map.setdefault( + table_id, + { + "table_id": table_id, + "name": _clean_text(entry.get("target_object_name")), + "columns": [], + "tool_name": None, + "tool_id": None, + }, + ) + item["name"] = item.get("name") or _clean_text(entry.get("target_object_name")) + elif src_type == "tool" and tgt_type == "table": + table_id = _clean_text(entry.get("target_object_id")) + if not table_id: + continue + item = table_map.setdefault( + table_id, + { + "table_id": table_id, + "name": _clean_text(entry.get("target_object_name")), + "columns": [], + "tool_name": None, + "tool_id": None, + }, + ) + item["tool_id"] = _clean_text(entry.get("source_object_id")) + item["tool_name"] = _clean_text(entry.get("source_object_name")) + item["name"] = item.get("name") or _clean_text(entry.get("target_object_name")) + return list(table_map.values()) + + +def _normalize_tables_payload( + tables: Any, + tools: Optional[List[Dict[str, Any]]], + data_sources: Optional[List[Dict[str, Any]]], +) -> List[Dict[str, Any]]: + normalized: Dict[str, Dict[str, Any]] = {} + for table in [ + *_table_items(tables), + *_tables_from_tools(tools), + *_tables_from_data_sources(data_sources), + ]: + raw_table_id = table.get("table_id") + table_name = table.get("name") + if raw_table_id: + key = f"id:{raw_table_id}" + elif table_name: + key = f"name:{str(table_name).strip().lower()}" + else: + key = f"anonymous:{len(normalized)}" + item = normalized.setdefault( + key, + { + "table_id": raw_table_id, + "name": table_name, + "columns": [], + "tool_name": table.get("tool_name"), + "tool_id": table.get("tool_id"), + }, + ) + item["table_id"] = item.get("table_id") or raw_table_id + item["name"] = table_name or item.get("name") + item["tool_name"] = table.get("tool_name") or item.get("tool_name") + item["tool_id"] = table.get("tool_id") or item.get("tool_id") + existing_columns = {str(col).strip().lower() for col in item["columns"]} + for column_name in table.get("columns") or []: + column_key = str(column_name).strip().lower() + if column_key and column_key not in existing_columns: + item["columns"].append(column_name) + existing_columns.add(column_key) + + for item in normalized.values(): + item["table_id"] = item.get("table_id") or str(uuid.uuid4()) + return list(normalized.values()) + + def _write_agent_card( agent_id: str, agent_internal_id: str, agent_name: str, description: str, instruction: str, - tools: Optional[List[Dict[str, str]]] = None, + tools: Optional[List[Dict[str, Any]]] = None, knowledge_source: Optional[Dict[str, str]] = None, + tables: Optional[List[Dict[str, Any]]] = None, ) -> None: """Write a full agent card JSON file immediately after creation so get_agent_card returns complete details.""" try: @@ -202,7 +388,7 @@ def _write_agent_card( data_source_entries = [] if tools: for tool in tools: - tool_id = str(uuid.uuid4()) + tool_id = tool.get("identifier") or str(uuid.uuid4()) tool_entries.append({ "identifier": tool_id, "name": tool.get("name"), @@ -232,6 +418,46 @@ def _write_agent_card( "uses_pci": None, }) + for table in tables or []: + table_id = table.get("table_id") + table_name = table.get("name") + if not table_id: + continue + data_source_entries.append({ + "relationship_id": None, + "parent_relationship_id": None, + "source_object_id": table.get("tool_id") or agent_id, + "source_object_domain": None, + "source_object_name": table.get("tool_name") or agent_name, + "source_object_type": "Tool" if table.get("tool_id") else "Agent", + "target_object_id": table_id, + "target_object_domain": None, + "target_object_name": table_name, + "target_object_type": "Table", + "access_level": None, + "uses_pii": None, + "uses_phi": None, + "uses_pci": None, + }) + for column_name in table.get("columns") or []: + col_id = str(uuid.uuid5(uuid.NAMESPACE_OID, f"{table_id}:{column_name}")) + data_source_entries.append({ + "relationship_id": None, + "parent_relationship_id": None, + "source_object_id": table_id, + "source_object_domain": None, + "source_object_name": table_name, + "source_object_type": "Table", + "target_object_id": col_id, + "target_object_domain": None, + "target_object_name": column_name, + "target_object_type": "Column", + "access_level": None, + "uses_pii": None, + "uses_phi": None, + "uses_pci": None, + }) + ks_entry = None if knowledge_source: ks_entry = { @@ -352,8 +578,15 @@ async def create_agent( "environment": body.environment or None}, ) + tool_name_to_id: Dict[str, str] = {} + tools_for_card: List[Dict[str, Any]] = [] for tool in (body.tools or []): tool_id = str(uuid.uuid4()) + tool_name = tool.get("name", "") + tool_name_key = str(tool_name).strip().lower() + if tool_name_key: + tool_name_to_id[tool_name_key] = tool_id + tools_for_card.append({**tool, "identifier": tool_id}) await db.execute( text(f""" INSERT INTO {CORE}.agent_tools @@ -364,9 +597,193 @@ async def create_agent( :tname, :tdesc, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) """), {"tid": tenant_id, "iid": agent_internal_id, "tool_id": tool_id, - "aid": agent_id, "tname": tool.get("name", ""), "tdesc": tool.get("description", "")}, + "aid": agent_id, "tname": tool_name, "tdesc": tool.get("description", "")}, ) + tables_payload = _normalize_tables_payload(body.tables, body.tools, body.data_source) + for table in tables_payload: + tool_name_key = str(table.get("tool_name") or "").strip().lower() + if tool_name_key and not table.get("tool_id"): + table["tool_id"] = tool_name_to_id.get(tool_name_key) + + table_id = table.get("table_id") or str(uuid.uuid4()) + table["table_id"] = table_id + table_name = table.get("name") + table_tool_id = table.get("tool_id") + + await db.execute( + text(f""" + INSERT INTO {CORE}.tables + (tenant_id, table_id, name, created_ts, updated_ts) + VALUES + (:tid, :table_id, :name, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (table_id) + DO UPDATE SET + name = COALESCE(EXCLUDED.name, {CORE}.tables.name), + updated_ts = EXCLUDED.updated_ts + """), + { + "tid": tenant_id, + "table_id": table_id, + "name": table_name, + }, + ) + + await db.execute( + text(f""" + INSERT INTO {CORE}.agent_tables + (tenant_id, agent_id, agent_name, agent_internal_id, + table_id, table_name, created_ts, updated_ts) + VALUES + (:tid, :aid, :aname, :iid, :table_id, :table_name, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (tenant_id, agent_id, table_id) DO UPDATE SET + agent_name = EXCLUDED.agent_name, + agent_internal_id = EXCLUDED.agent_internal_id, + table_name = COALESCE(EXCLUDED.table_name, {CORE}.agent_tables.table_name), + updated_ts = EXCLUDED.updated_ts + """), + {"tid": tenant_id, "aid": agent_id, "aname": body.agent_name, + "iid": agent_internal_id, "table_id": table_id, "table_name": table_name}, + ) + + if table_tool_id: + await db.execute( + text(f""" + INSERT INTO {CORE}.agent_data_sources ( + tenant_id, agent_internal_id, agent_id, + created_ts, updated_ts, + source_object_id, source_object_name, source_object_type, + target_object_id, target_object_name, target_object_type + ) + VALUES ( + :tid, :iid, :aid, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, + :tool_id, :tool_name, 'Tool', + :table_id, :table_name, 'Table' + ) + ON CONFLICT (agent_internal_id, source_object_id, target_object_id) + DO UPDATE SET + updated_ts = EXCLUDED.updated_ts, + source_object_name = EXCLUDED.source_object_name, + target_object_name = EXCLUDED.target_object_name + """), + { + "tid": tenant_id, + "iid": agent_internal_id, + "aid": agent_id, + "tool_id": table_tool_id, + "tool_name": table.get("tool_name"), + "table_id": table_id, + "table_name": table_name, + }, + ) + await db.execute( + text(f""" + INSERT INTO {CORE}.tool_tables + (tenant_id, tool_id, tool_name, table_id, table_name, + created_ts, updated_ts) + VALUES + (:tid, :tool_id, :tool_name, :table_id, :table_name, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET + tool_name = COALESCE(EXCLUDED.tool_name, {CORE}.tool_tables.tool_name), + table_name = COALESCE(EXCLUDED.table_name, {CORE}.tool_tables.table_name), + updated_ts = EXCLUDED.updated_ts + """), + { + "tid": tenant_id, + "tool_id": table_tool_id, + "tool_name": table.get("tool_name"), + "table_id": table_id, + "table_name": table_name, + }, + ) + else: + await db.execute( + text(f""" + INSERT INTO {CORE}.agent_data_sources ( + tenant_id, agent_internal_id, agent_id, + created_ts, updated_ts, + source_object_id, source_object_name, source_object_type, + target_object_id, target_object_name, target_object_type + ) + VALUES ( + :tid, :iid, :aid, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, + :aid, :agent_name, 'Agent', + :table_id, :table_name, 'Table' + ) + """), + { + "tid": tenant_id, + "iid": agent_internal_id, + "aid": agent_id, + "agent_name": body.agent_name, + "table_id": table_id, + "table_name": table_name, + }, + ) + + for column_name in table.get("columns") or []: + column_id = str(uuid.uuid5(uuid.NAMESPACE_OID, f"{table_id}:{column_name}")) + await db.execute( + text(f""" + INSERT INTO {CORE}.columns (column_id, tenant_id, name, created_ts, updated_ts) + VALUES (:col_id, :tid, :col_name, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (column_id) + DO UPDATE SET + tenant_id = EXCLUDED.tenant_id, + updated_ts = EXCLUDED.updated_ts + """), + {"col_id": column_id, "tid": tenant_id, "col_name": column_name}, + ) + await db.execute( + text(f""" + INSERT INTO {CORE}.table_columns + (tenant_id, table_id, table_name, column_name, column_id, created_ts, updated_ts) + VALUES + (:tid, :table_id, :table_name, :column_name, :col_id, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT (tenant_id, table_id, column_name) DO UPDATE SET + table_name = COALESCE(EXCLUDED.table_name, {CORE}.table_columns.table_name), + column_id = COALESCE(EXCLUDED.column_id, {CORE}.table_columns.column_id), + updated_ts = EXCLUDED.updated_ts + """), + {"tid": tenant_id, "table_id": table_id, + "table_name": table_name, "column_name": column_name, "col_id": column_id}, + ) + await db.execute( + text(f""" + INSERT INTO {CORE}.agent_data_sources ( + tenant_id, agent_internal_id, agent_id, + created_ts, updated_ts, + source_object_id, source_object_name, source_object_type, + target_object_id, target_object_name, target_object_type + ) + VALUES ( + :tid, :iid, :aid, + CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, + :table_id, :table_name, 'Table', + :col_id, :column_name, 'Column' + ) + ON CONFLICT (agent_internal_id, source_object_id, target_object_id) + DO UPDATE SET + updated_ts = EXCLUDED.updated_ts, + source_object_name = EXCLUDED.source_object_name, + target_object_name = EXCLUDED.target_object_name + """), + { + "tid": tenant_id, + "iid": agent_internal_id, + "aid": agent_id, + "table_id": table_id, + "table_name": table_name, + "col_id": column_id, + "column_name": column_name, + }, + ) + if body.knowledge_source: await db.execute( text(f""" @@ -415,8 +832,9 @@ async def create_agent( agent_name=body.agent_name, description=body.description, instruction=body.instruction, - tools=body.tools, + tools=tools_for_card, knowledge_source=body.knowledge_source, + tables=tables_payload, ) background_tasks.add_task( diff --git a/tavro_app/src/services/agentApi.ts b/tavro_app/src/services/agentApi.ts index 5a6a708..f27bce2 100644 --- a/tavro_app/src/services/agentApi.ts +++ b/tavro_app/src/services/agentApi.ts @@ -47,7 +47,9 @@ export interface AgentCreatePayload { role?: string; environment?: string; owner?: string; - tools?: Array<{ name: string; description: string }>; + tools?: Array<{ name: string; description: string; table?: any; tables?: any[]; columns?: any[] }>; + tables?: Array<{ table_id?: string; name?: string; table_name?: string; columns?: any[]; tool_name?: string; tool_id?: string }>; + data_source?: Array>; knowledge_source?: { name: string; description: string }; } diff --git a/tavro_app/src/services/mcpClient.ts b/tavro_app/src/services/mcpClient.ts index de07090..98c8b7a 100644 --- a/tavro_app/src/services/mcpClient.ts +++ b/tavro_app/src/services/mcpClient.ts @@ -1184,6 +1184,9 @@ Every generated value must be coherent with the blueprint. Do not fabricate data description, instruction, ...(args?.tools ? { tools: args.tools } : {}), + ...(args?.tables ? { tables: args.tables } : {}), + ...(args?.columns ? { columns: args.columns } : {}), + ...(args?.data_source ? { data_source: args.data_source } : {}), ...(args?.knowledge_source ? { knowledge_source: args.knowledge_source } : {}), ...(args?.original_prompt ? { original_prompt: args.original_prompt } : {}), }; diff --git a/tavro_library/agent_library.py b/tavro_library/agent_library.py index a39ba9e..8d72f2a 100644 --- a/tavro_library/agent_library.py +++ b/tavro_library/agent_library.py @@ -394,6 +394,77 @@ def get_agent_card(cls, agent_name: Optional[str] = None, agent_id: Optional[str except Exception as ra_overlay_err: print(f"[get_agent_card] Risk assessment overlay failed (returning card as-is): {ra_overlay_err}") + # Overlay data_source from DB so renames and new relationships are + # immediately visible in the UI lineage without regenerating the card file. + try: + ds_rows = cls.execute_select( + f""" + SELECT relationship_id, parent_relationship_id, + source_object_id, source_object_domain, source_object_name, source_object_type, + target_object_id, target_object_domain, target_object_name, target_object_type, + access_level, contains_pii, contains_phi, contains_pci + FROM {cls.CORE_DB_NAME}.agent_data_sources + WHERE agent_id = %s + ORDER BY created_ts NULLS LAST + """, + (agent_id_clean,), + ) + if ds_rows: + local_card["data_source"] = [ + { + "relationship_id": r.get("relationship_id"), + "parent_relationship_id": r.get("parent_relationship_id"), + "source_object_id": r.get("source_object_id"), + "source_object_domain": r.get("source_object_domain"), + "source_object_name": r.get("source_object_name"), + "source_object_type": r.get("source_object_type"), + "target_object_id": r.get("target_object_id"), + "target_object_domain": r.get("target_object_domain"), + "target_object_name": r.get("target_object_name"), + "target_object_type": r.get("target_object_type"), + "access_level": r.get("access_level"), + "uses_pii": r.get("contains_pii"), + "uses_phi": r.get("contains_phi"), + "uses_pci": r.get("contains_pci"), + } + for r in ds_rows + ] + except Exception as ds_overlay_err: + print(f"[get_agent_card] Data source overlay failed (returning card as-is): {ds_overlay_err}") + + # Overlay tool list from DB so tools added via update_agent are visible. + try: + tool_rows = cls.execute_select( + f""" + SELECT tool_id, tool_name, tool_description, + delegation_possible, allowed_delegates, + input_schema_json_text, output_schema_json_text, + default_config_json_text + FROM {cls.CORE_DB_NAME}.agent_tools + WHERE agent_id = %s + ORDER BY created_ts NULLS LAST + """, + (agent_id_clean,), + ) + if tool_rows: + local_card["tool"] = [ + { + "identifier": r.get("tool_id"), + "name": r.get("tool_name"), + "description": r.get("tool_description"), + "delegation_possible": r.get("delegation_possible"), + "allowed_delegates": r.get("allowed_delegates"), + "parameter_name": None, + "parameter_type": None, + "default_value": r.get("default_config_json_text"), + "input_schema": r.get("input_schema_json_text"), + "output_schema": r.get("output_schema_json_text"), + } + for r in tool_rows + ] + except Exception as tool_overlay_err: + print(f"[get_agent_card] Tool overlay failed (returning card as-is): {tool_overlay_err}") + return local_card # ---------- 7. Not found ---------- @@ -525,6 +596,249 @@ def _send(): @staticmethod def sanitize(val: str) -> str: return val.replace("'", "''") if val else val + + @staticmethod + def _clean_text(value: Optional[Any]) -> Optional[str]: + if value is None: + return None + text_value = str(value).strip() + return text_value or None + + @classmethod + def _column_names(cls, raw_columns: Any) -> List[str]: + if not raw_columns: + return [] + if isinstance(raw_columns, str): + raw_columns = [raw_columns] + if not isinstance(raw_columns, list): + return [] + + names: List[str] = [] + seen = set() + for col in raw_columns: + if isinstance(col, dict): + name = cls._clean_text(col.get("name") or col.get("column_name") or col.get("identifier")) + else: + name = cls._clean_text(col) + if name and name.lower() not in seen: + seen.add(name.lower()) + names.append(name) + return names + + @classmethod + def _table_items(cls, raw_tables: Any) -> List[Dict[str, Any]]: + if not raw_tables: + return [] + if isinstance(raw_tables, dict): + raw_tables = [raw_tables] + elif isinstance(raw_tables, str): + raw_tables = [{"name": raw_tables}] + if not isinstance(raw_tables, list): + return [] + + tables: List[Dict[str, Any]] = [] + for raw in raw_tables: + if isinstance(raw, str): + raw = {"name": raw} + if not isinstance(raw, dict): + continue + tables.append({ + "table_id": cls._clean_text(raw.get("table_id") or raw.get("id") or raw.get("identifier")), + "name": cls._clean_text(raw.get("name") or raw.get("table_name")), + "tool_name": cls._clean_text(raw.get("tool_name") or raw.get("tool")), + "tool_id": cls._clean_text(raw.get("tool_id")), + }) + return tables + + @classmethod + def _column_items(cls, raw_columns: Any) -> List[Dict[str, Any]]: + if not raw_columns: + return [] + if isinstance(raw_columns, dict): + raw_columns = [raw_columns] + elif isinstance(raw_columns, str): + raw_columns = [{"name": raw_columns}] + if not isinstance(raw_columns, list): + return [] + + columns: List[Dict[str, Any]] = [] + seen = set() + for raw in raw_columns: + if isinstance(raw, str): + raw = {"name": raw} + if not isinstance(raw, dict): + continue + name = cls._clean_text(raw.get("name") or raw.get("column_name") or raw.get("identifier")) + if not name: + continue + table_id = cls._clean_text(raw.get("table_id")) + table_name = cls._clean_text(raw.get("table_name") or raw.get("table")) + key = (name.lower(), (table_id or "").lower(), (table_name or "").lower()) + if key in seen: + continue + seen.add(key) + columns.append({ + "name": name, + "table_id": table_id, + "table_name": table_name, + }) + return columns + + @classmethod + def _tables_from_tools(cls, tools: Optional[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + tables: List[Dict[str, Any]] = [] + for tool in tools or []: + if not isinstance(tool, dict): + continue + tool_name = cls._clean_text(tool.get("name")) + tool_tables = cls._table_items(tool.get("tables") or tool.get("table")) + + for table in tool_tables: + table["tool_name"] = table.get("tool_name") or tool_name + tables.append(table) + return tables + + @classmethod + def _tables_from_data_sources(cls, data_sources: Optional[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + table_map: Dict[str, Dict[str, Any]] = {} + for entry in data_sources or []: + if not isinstance(entry, dict): + continue + src_type = str(entry.get("source_object_type") or "").lower() + tgt_type = str(entry.get("target_object_type") or "").lower() + if src_type == "table" and tgt_type == "column": + table_id = cls._clean_text(entry.get("source_object_id")) + if not table_id: + continue + item = table_map.setdefault( + table_id, + {"table_id": table_id, "name": cls._clean_text(entry.get("source_object_name")), "tool_name": None, "tool_id": None}, + ) + elif src_type == "agent" and tgt_type == "table": + table_id = cls._clean_text(entry.get("target_object_id")) + if not table_id: + continue + item = table_map.setdefault( + table_id, + {"table_id": table_id, "name": cls._clean_text(entry.get("target_object_name")), "tool_name": None, "tool_id": None}, + ) + item["name"] = item.get("name") or cls._clean_text(entry.get("target_object_name")) + elif src_type == "tool" and tgt_type == "table": + table_id = cls._clean_text(entry.get("target_object_id")) + if not table_id: + continue + item = table_map.setdefault( + table_id, + {"table_id": table_id, "name": cls._clean_text(entry.get("target_object_name")), "tool_name": None, "tool_id": None}, + ) + item["tool_id"] = cls._clean_text(entry.get("source_object_id")) + item["tool_name"] = cls._clean_text(entry.get("source_object_name")) + item["name"] = item.get("name") or cls._clean_text(entry.get("target_object_name")) + return list(table_map.values()) + + @classmethod + def _columns_from_data_sources(cls, data_sources: Optional[List[Dict[str, Any]]]) -> List[Dict[str, Any]]: + columns: List[Dict[str, Any]] = [] + for entry in data_sources or []: + if not isinstance(entry, dict): + continue + src_type = str(entry.get("source_object_type") or "").lower() + tgt_type = str(entry.get("target_object_type") or "").lower() + if src_type != "table" or tgt_type != "column": + continue + column_name = cls._clean_text(entry.get("target_object_name") or entry.get("target_object_id")) + if not column_name: + continue + columns.append({ + "name": column_name, + "table_id": cls._clean_text(entry.get("source_object_id")), + "table_name": cls._clean_text(entry.get("source_object_name")), + }) + return columns + + @classmethod + def _normalize_tables_payload( + cls, + tables: Any, + tools: Optional[List[Dict[str, Any]]], + data_sources: Optional[List[Dict[str, Any]]], + ) -> List[Dict[str, Any]]: + normalized: Dict[str, Dict[str, Any]] = {} + for table in [ + *cls._table_items(tables), + *cls._tables_from_tools(tools), + *cls._tables_from_data_sources(data_sources), + ]: + raw_table_id = table.get("table_id") + table_name = table.get("name") + if raw_table_id: + key = f"id:{raw_table_id}" + elif table_name: + key = f"name:{str(table_name).strip().lower()}" + else: + key = f"anonymous:{len(normalized)}" + item = normalized.setdefault( + key, + { + "table_id": raw_table_id, + "source_table_id": raw_table_id, + "name": table_name, + "tool_name": table.get("tool_name"), + "tool_id": table.get("tool_id"), + }, + ) + item["table_id"] = item.get("table_id") or raw_table_id + item["source_table_id"] = item.get("source_table_id") or raw_table_id + item["name"] = table_name or item.get("name") + item["tool_name"] = table.get("tool_name") or item.get("tool_name") + item["tool_id"] = table.get("tool_id") or item.get("tool_id") + + for item in normalized.values(): + item["table_id"] = str(uuid.uuid4()) + return list(normalized.values()) + + @classmethod + def _columns_by_table( + cls, + tables_payload: List[Dict[str, Any]], + columns: Any, + data_sources: Optional[List[Dict[str, Any]]], + ) -> Dict[int, List[str]]: + column_entries = [ + *cls._column_items(columns), + *cls._columns_from_data_sources(data_sources), + ] + columns_by_table: Dict[int, List[str]] = {} + + for col_entry in column_entries: + col_name = cls._clean_text(col_entry.get("name")) + if not col_name: + continue + match_id = str(col_entry.get("table_id") or "").strip() + match_name = str(col_entry.get("table_name") or "").strip().lower() + + matched_index: Optional[int] = None + for index, tbl in enumerate(tables_payload): + table_ids = { + str(tbl.get("table_id") or "").strip(), + str(tbl.get("source_table_id") or "").strip(), + } + table_name = str(tbl.get("name") or "").strip().lower() + if (match_id and match_id in table_ids) or (match_name and table_name == match_name): + matched_index = index + break + + if matched_index is None and len(tables_payload) == 1 and not match_id and not match_name: + matched_index = 0 + if matched_index is None: + continue + + existing = {name.strip().lower() for name in columns_by_table.get(matched_index, [])} + col_key = col_name.strip().lower() + if col_key not in existing: + columns_by_table.setdefault(matched_index, []).append(col_name) + + return columns_by_table @staticmethod def _normalize_tenant_id(value: Optional[Any]) -> Optional[str]: @@ -655,9 +969,11 @@ def _write_agent_card( agent_name: str, description: str, instruction: str, - tools: Optional[List[Dict[str, str]]] = None, + tools: Optional[List[Dict[str, Any]]] = None, knowledge_source: Optional[Dict[str, str]] = None, tool_ids: Optional[List[str]] = None, + tables: Optional[List[Dict[str, Any]]] = None, + columns_by_table: Optional[Dict[int, List[str]]] = None, ) -> None: """Write a full agent card JSON file immediately after creation so get_agent_card returns complete details.""" try: @@ -697,6 +1013,46 @@ def _write_agent_card( "uses_pci": None, }) + for table_index, table in enumerate(tables or []): + table_id = table.get("table_id") + table_name = table.get("name") + if not table_id: + continue + data_source_entries.append({ + "relationship_id": None, + "parent_relationship_id": None, + "source_object_id": table.get("tool_id") or agent_id, + "source_object_domain": None, + "source_object_name": table.get("tool_name") or agent_name, + "source_object_type": "Tool" if table.get("tool_id") else "Agent", + "target_object_id": table_id, + "target_object_domain": None, + "target_object_name": table_name, + "target_object_type": "Table", + "access_level": None, + "uses_pii": None, + "uses_phi": None, + "uses_pci": None, + }) + for column_name in (columns_by_table or {}).get(table_index, []): + col_id = str(uuid.uuid4()) + data_source_entries.append({ + "relationship_id": None, + "parent_relationship_id": None, + "source_object_id": table_id, + "source_object_domain": None, + "source_object_name": table_name, + "source_object_type": "Table", + "target_object_id": col_id, + "target_object_domain": None, + "target_object_name": column_name, + "target_object_type": "Column", + "access_level": None, + "uses_pii": None, + "uses_phi": None, + "uses_pci": None, + }) + ks_entry = None if knowledge_source: ks_entry = { @@ -777,7 +1133,10 @@ def create_agent( agent_name: str, description: str, instruction: str, - tools: Optional[List[Dict[str, str]]] = None, + tools: Optional[List[Dict[str, Any]]] = None, + tables: Optional[List[Dict[str, Any]]] = None, + columns: Optional[List[Dict[str, Any]]] = None, + data_source: Optional[List[Dict[str, Any]]] = None, knowledge_source: Optional[Dict[str, str]] = None, tenant_id: Optional[str] = None )-> Dict[str, Any]: @@ -798,7 +1157,15 @@ def create_agent( queries = [] data_source_values = [] + table_values = [] + column_values = [] + agent_table_values = [] + tool_table_values = [] + table_column_values = [] tool_ids_for_card: List[str] = [] + tool_name_to_id: Dict[str, str] = {} + tables_payload = cls._normalize_tables_payload(tables, tools, data_source) + columns_by_table = cls._columns_by_table(tables_payload, columns, data_source) # 1. agents table tenant_id_value = f"'{tenant_id}'," if tenant_id else "" @@ -856,6 +1223,8 @@ def create_agent( tool_ids_for_card.append(tool_id) name = cls.sanitize(tool.get("name")) desc = cls.sanitize(tool.get("description")) + if name: + tool_name_to_id[str(tool.get("name")).strip().lower()] = tool_id values_list.append(f""" ( @@ -902,6 +1271,186 @@ def create_agent( """ queries.append(tools_query) + for table_index, table in enumerate(tables_payload): + tool_name_key = str(table.get("tool_name") or "").strip().lower() + if tool_name_key and not table.get("tool_id"): + table["tool_id"] = tool_name_to_id.get(tool_name_key) + + table_id = str(uuid.uuid4()) + table["table_id"] = table_id + table_name = cls.sanitize(table.get("name") or "") + table_tool_id = cls.sanitize(table.get("tool_id") or "") + table_tool_name = cls.sanitize(table.get("tool_name") or "") + + table_values.append(f""" + ( + {tenant_id_value} + '{table_id}', + '{table_name}', + TIMESTAMP '{now}', + TIMESTAMP '{now}' + ) + """) + + # agent_tables relationship + agent_table_values.append(f""" + ( + {tenant_id_value} + '{agent_id}', + '{agent_name}', + '{agent_internal_id}', + '{table_id}', + '{table_name}', + TIMESTAMP '{now}', + TIMESTAMP '{now}' + ) + """) + + if table_tool_id: + data_source_values.append(f""" + ( + {tenant_id_value} + '{agent_internal_id}', + '{agent_id}', + TIMESTAMP '{now}', + TIMESTAMP '{now}', + '{table_tool_id}', + '{table_tool_name}', + 'Tool', + '{table_id}', + '{table_name}', + 'Table' + ) + """) + # tool_tables relationship + tool_table_values.append(f""" + ( + {tenant_id_value} + '{table_tool_id}', + '{table_tool_name}', + '{table_id}', + '{table_name}', + TIMESTAMP '{now}', + TIMESTAMP '{now}' + ) + """) + else: + data_source_values.append(f""" + ( + {tenant_id_value} + '{agent_internal_id}', + '{agent_id}', + TIMESTAMP '{now}', + TIMESTAMP '{now}', + '{agent_id}', + '{agent_name}', + 'Agent', + '{table_id}', + '{table_name}', + 'Table' + ) + """) + + for column_name in columns_by_table.get(table_index, []): + clean_column = cls.sanitize(column_name) + if not clean_column: + continue + column_id = str(uuid.uuid4()) + column_values.append(f""" + ( + '{column_id}', + {tenant_id_value} + '{clean_column}', + TIMESTAMP '{now}', + TIMESTAMP '{now}' + ) + """) + # table_columns relationship + table_column_values.append(f""" + ( + {tenant_id_value} + '{table_id}', + '{table_name}', + '{clean_column}', + '{column_id}', + TIMESTAMP '{now}', + TIMESTAMP '{now}' + ) + """) + data_source_values.append(f""" + ( + {tenant_id_value} + '{agent_internal_id}', + '{agent_id}', + TIMESTAMP '{now}', + TIMESTAMP '{now}', + '{table_id}', + '{table_name}', + 'Table', + '{column_id}', + '{clean_column}', + 'Column' + ) + """) + + if table_values: + queries.append(f""" + INSERT INTO {cls.CORE_DB_NAME}.tables ( + {tenant_id_column} + table_id, + name, + created_ts, + updated_ts + ) + VALUES + {",".join(table_values)} + """) + + if column_values: + queries.append(f""" + INSERT INTO {cls.CORE_DB_NAME}.columns ( + column_id, + {tenant_id_column} + name, + created_ts, + updated_ts + ) + VALUES + {",".join(column_values)} + """) + + if agent_table_values: + queries.append(f""" + INSERT INTO {cls.CORE_DB_NAME}.agent_tables ( + {tenant_id_column} + agent_id, agent_name, agent_internal_id, + table_id, table_name, created_ts, updated_ts + ) + VALUES + {",".join(agent_table_values)} + """) + + if tool_table_values: + queries.append(f""" + INSERT INTO {cls.CORE_DB_NAME}.tool_tables ( + {tenant_id_column} + tool_id, tool_name, table_id, table_name, + created_ts, updated_ts + ) + VALUES + {",".join(tool_table_values)} + """) + + if table_column_values: + queries.append(f""" + INSERT INTO {cls.CORE_DB_NAME}.table_columns ( + {tenant_id_column} + table_id, table_name, column_name, column_id, created_ts, updated_ts + ) + VALUES + {",".join(table_column_values)} + """) + # 4. knowledge sources (ONLY name + description) if knowledge_source: ks_name = cls.sanitize(knowledge_source.get("name")) @@ -927,7 +1476,7 @@ def create_agent( ) """) - # 5. data source insert (only if tools exist) + # 5. data source insert if data_source_values: queries.append(f""" INSERT INTO {cls.CORE_DB_NAME}.agent_data_sources ( @@ -961,6 +1510,8 @@ def create_agent( tools=tools, knowledge_source=knowledge_source, tool_ids=tool_ids_for_card, + tables=tables_payload, + columns_by_table=columns_by_table, ) payload = { @@ -1693,6 +2244,9 @@ def update_agent( instruction: Optional[str] = None, tools: Optional[List[Dict[str, str]]] = None, knowledge_source: Optional[Dict[str, str]] = None, + tables: Optional[List[Dict[str, Any]]] = None, + columns: Optional[List[Dict[str, Any]]] = None, + data_source: Optional[List[Dict[str, Any]]] = None, tenant_id: Optional[str] = None ) -> Dict[str, Any]: """ @@ -1717,11 +2271,12 @@ def update_agent( agent_id = cls.sanitize(str(agent_id).strip()) # Fetch agent info (1 query) - rows = cls.execute_select(f"SELECT agent_internal_id FROM {cls.CORE_DB_NAME}.agents WHERE agent_id = '{agent_id}' AND is_current = true {tenant_where} LIMIT 1") + rows = cls.execute_select(f"SELECT agent_internal_id, agent_name FROM {cls.CORE_DB_NAME}.agents WHERE agent_id = '{agent_id}' AND is_current = true {tenant_where} LIMIT 1") if not rows: raise ValueError(f"Agent '{agent_id}' not found.") - + agent_internal_id = rows[0].get("agent_internal_id") + current_agent_name = cls.sanitize(str(rows[0].get("agent_name") or "").strip()) now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Batch updates into single transaction @@ -1737,12 +2292,37 @@ def update_agent( cls.execute_dml(f"INSERT INTO {cls.CORE_DB_NAME}.agent_identifications ({tenant_col}agent_internal_id, agent_id, instruction, created_ts, updated_ts, is_current) VALUES ({tenant_val}'{agent_internal_id}', '{agent_id}', '{instr}', TIMESTAMP '{now}', TIMESTAMP '{now}', true)") if tools: - cls.execute_dml(f"DELETE FROM {cls.CORE_DB_NAME}.agent_tools WHERE agent_id = '{agent_id}' {tenant_where}") - vals = [] + tool_rows = [] + tool_ds_rows = [] for t in tools: - vals.append(f"({tenant_val}'{agent_internal_id}', '{agent_id}', '{cls.sanitize(t.get('name', ''))}', '{cls.sanitize(t.get('description', ''))}', TIMESTAMP '{now}', TIMESTAMP '{now}')") - if vals: - cls.execute_dml(f"INSERT INTO {cls.CORE_DB_NAME}.agent_tools ({tenant_col}agent_internal_id, agent_id, tool_name, tool_description, created_ts, updated_ts) VALUES {','.join(vals)}") + tool_id = str(uuid.uuid4()) + t_name = cls.sanitize(t.get("name", "")) + t_desc = cls.sanitize(t.get("description", "")) + tool_rows.append( + f"({tenant_val}'{agent_internal_id}', '{tool_id}', '{agent_id}', " + f"'{t_name}', '{t_desc}', TIMESTAMP '{now}', TIMESTAMP '{now}')" + ) + tool_ds_rows.append( + f"({tenant_val}'{agent_internal_id}', '{agent_id}', " + f"NULL, NULL::boolean, NULL::boolean, NULL::boolean, " + f"TIMESTAMP '{now}', TIMESTAMP '{now}', " + f"'{agent_id}', NULL, '{current_agent_name}', 'Agent', " + f"'{tool_id}', NULL, '{t_name}', 'Tool')" + ) + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.agent_tools " + f"({tenant_col}agent_internal_id, tool_id, agent_id, tool_name, tool_description, created_ts, updated_ts) " + f"VALUES {','.join(tool_rows)}" + ) + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.agent_data_sources " + f"({tenant_col}agent_internal_id, agent_id, " + f"access_level, contains_pii, contains_phi, contains_pci, " + f"created_ts, updated_ts, " + f"source_object_id, source_object_domain, source_object_name, source_object_type, " + f"target_object_id, target_object_domain, target_object_name, target_object_type) " + f"VALUES {','.join(tool_ds_rows)}" + ) if knowledge_source: cls.execute_dml(f"DELETE FROM {cls.CORE_DB_NAME}.agent_knowledge_sources WHERE agent_id = '{agent_id}' {tenant_where}") @@ -1750,7 +2330,212 @@ def update_agent( ks_desc = cls.sanitize(knowledge_source.get("description", "")) cls.execute_dml(f"INSERT INTO {cls.CORE_DB_NAME}.agent_knowledge_sources ({tenant_col}agent_internal_id, agent_id, name, description, created_ts, updated_ts) VALUES ({tenant_val}'{agent_internal_id}', '{agent_id}', '{ks_name}', '{ks_desc}', TIMESTAMP '{now}', TIMESTAMP '{now}')") - return {"message": "Agent updated successfully.", "agent_id": agent_id} + # Merge data_source entries into tables so both paths produce the same result + if data_source: + extra = cls._normalize_tables_payload(None, None, data_source) + tables = list(tables or []) + extra + tables_for_update = [table for table in (tables or []) if isinstance(table, dict)] + columns_for_new_tables = [ + col for col in (columns or []) + if isinstance(col, dict) and not col.get("old_name") + ] + columns_by_table = cls._columns_by_table(tables_for_update, columns_for_new_tables, data_source) + + tables_updated = 0 + for table_index, table in enumerate(tables_for_update): + new_name = cls.sanitize(str(table.get("name") or "").strip()) + if not new_name: + continue + + old_name = cls.sanitize(str(table.get("old_name") or "").strip()) + + if old_name: + # ── RENAME path ────────────────────────────────────────────── + table_id = cls.sanitize(str(table.get("table_id") or "").strip()) + if not table_id: + found = cls.execute_select( + f"SELECT table_id FROM {cls.CORE_DB_NAME}.tables " + f"WHERE LOWER(name) = LOWER('{old_name}') {tenant_where} LIMIT 1" + ) + if found: + table_id = cls.sanitize(str(found[0].get("table_id") or "").strip()) + if not table_id: + continue + + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.tables SET name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE table_id = '{table_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.agent_tables SET table_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE table_id = '{table_id}' AND agent_id = '{agent_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.tool_tables SET table_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE table_id = '{table_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.table_columns SET table_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE table_id = '{table_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.agent_data_sources " + f"SET target_object_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE agent_internal_id = '{agent_internal_id}' " + f"AND target_object_id = '{table_id}' " + f"AND LOWER(target_object_type) = 'table'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.agent_data_sources " + f"SET source_object_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE agent_internal_id = '{agent_internal_id}' " + f"AND source_object_id = '{table_id}' " + f"AND LOWER(source_object_type) = 'table'" + ) + + else: + # ── INSERT path (new table) ─────────────────────────────────── + table_id = str(uuid.uuid4()) + + tbl_tool_name = cls.sanitize(str(table.get("tool_name") or "").strip()) + tbl_tool_id = cls.sanitize(str(table.get("tool_id") or "").strip()) + if tbl_tool_name and not tbl_tool_id: + tool_rows = cls.execute_select( + f"SELECT tool_id FROM {cls.CORE_DB_NAME}.agent_tools " + f"WHERE agent_id = '{agent_id}' AND LOWER(tool_name) = LOWER('{tbl_tool_name}') {tenant_where} LIMIT 1" + ) + if tool_rows: + tbl_tool_id = cls.sanitize(str(tool_rows[0].get("tool_id") or "").strip()) + + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.tables ({tenant_col}table_id, name, created_ts, updated_ts) " + f"VALUES ({tenant_val}'{table_id}', '{new_name}', TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (table_id) DO UPDATE SET " + f"name = COALESCE(EXCLUDED.name, {cls.CORE_DB_NAME}.tables.name), updated_ts = EXCLUDED.updated_ts" + ) + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.agent_tables " + f"({tenant_col}agent_id, agent_name, agent_internal_id, table_id, table_name, created_ts, updated_ts) " + f"VALUES ({tenant_val}'{agent_id}', '{current_agent_name}', '{agent_internal_id}', " + f"'{table_id}', '{new_name}', TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (tenant_id, agent_id, table_id) DO UPDATE SET " + f"table_name = COALESCE(EXCLUDED.table_name, {cls.CORE_DB_NAME}.agent_tables.table_name), " + f"updated_ts = EXCLUDED.updated_ts" + ) + + src_id = tbl_tool_id or agent_id + src_name = tbl_tool_name or current_agent_name + src_type = 'Tool' if tbl_tool_id else 'Agent' + + if tbl_tool_id: + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.tool_tables " + f"({tenant_col}tool_id, tool_name, table_id, table_name, created_ts, updated_ts) " + f"VALUES ({tenant_val}'{tbl_tool_id}', '{tbl_tool_name}', '{table_id}', '{new_name}', " + f"TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (tenant_id, tool_id, table_id) DO UPDATE SET " + f"table_name = COALESCE(EXCLUDED.table_name, {cls.CORE_DB_NAME}.tool_tables.table_name), " + f"updated_ts = EXCLUDED.updated_ts" + ) + + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.agent_data_sources " + f"({tenant_col}agent_internal_id, agent_id, " + f"source_object_id, source_object_name, source_object_type, " + f"target_object_id, target_object_name, target_object_type, " + f"created_ts, updated_ts) " + f"VALUES ({tenant_val}'{agent_internal_id}', '{agent_id}', " + f"'{src_id}', '{src_name}', '{src_type}', " + f"'{table_id}', '{new_name}', 'Table', " + f"TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (agent_internal_id, source_object_id, target_object_id) DO UPDATE SET " + f"source_object_name = EXCLUDED.source_object_name, " + f"target_object_name = EXCLUDED.target_object_name, updated_ts = EXCLUDED.updated_ts" + ) + + for column_name in columns_by_table.get(table_index, []): + clean_col = cls.sanitize(str(column_name).strip()) + if not clean_col: + continue + col_id = str(uuid.uuid4()) + + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.columns ({tenant_col}column_id, name, created_ts, updated_ts) " + f"VALUES ({tenant_val}'{col_id}', '{clean_col}', TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (column_id) DO UPDATE SET updated_ts = EXCLUDED.updated_ts" + ) + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.table_columns " + f"({tenant_col}table_id, table_name, column_name, column_id, created_ts, updated_ts) " + f"VALUES ({tenant_val}'{table_id}', '{new_name}', '{clean_col}', '{col_id}', " + f"TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (tenant_id, table_id, column_name) DO UPDATE SET " + f"column_id = COALESCE(EXCLUDED.column_id, {cls.CORE_DB_NAME}.table_columns.column_id), " + f"updated_ts = EXCLUDED.updated_ts" + ) + cls.execute_dml( + f"INSERT INTO {cls.CORE_DB_NAME}.agent_data_sources " + f"({tenant_col}agent_internal_id, agent_id, " + f"source_object_id, source_object_name, source_object_type, " + f"target_object_id, target_object_name, target_object_type, " + f"created_ts, updated_ts) " + f"VALUES ({tenant_val}'{agent_internal_id}', '{agent_id}', " + f"'{table_id}', '{new_name}', 'Table', " + f"'{col_id}', '{clean_col}', 'Column', " + f"TIMESTAMP '{now}', TIMESTAMP '{now}') " + f"ON CONFLICT (agent_internal_id, source_object_id, target_object_id) DO UPDATE SET " + f"source_object_name = EXCLUDED.source_object_name, " + f"target_object_name = EXCLUDED.target_object_name, updated_ts = EXCLUDED.updated_ts" + ) + + tables_updated += 1 + + columns_updated = 0 + for col in (columns or []): + if not isinstance(col, dict): + continue + new_name = cls.sanitize(str(col.get("name") or "").strip()) + old_name = cls.sanitize(str(col.get("old_name") or "").strip()) + if not new_name or not old_name: + continue + + scoped_table_id = cls.sanitize(str(col.get("table_id") or "").strip()) + table_filter = f"AND table_id = '{scoped_table_id}'" if scoped_table_id else "" + tenant_filter = f"AND tenant_id = '{tenant_clean}'" if is_tenant else "" + + found = cls.execute_select( + f"SELECT column_id FROM {cls.CORE_DB_NAME}.table_columns " + f"WHERE LOWER(column_name) = LOWER('{old_name}') {table_filter} {tenant_filter} LIMIT 1" + ) + if not found: + continue + column_id = cls.sanitize(str(found[0].get("column_id") or "").strip()) + if not column_id: + continue + + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.columns SET name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE column_id = '{column_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.table_columns SET column_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE column_id = '{column_id}'" + ) + cls.execute_dml( + f"UPDATE {cls.CORE_DB_NAME}.agent_data_sources " + f"SET target_object_name = '{new_name}', updated_ts = TIMESTAMP '{now}' " + f"WHERE agent_internal_id = '{agent_internal_id}' " + f"AND LOWER(target_object_type) = 'column' " + f"AND LOWER(target_object_name) = LOWER('{old_name}')" + ) + columns_updated += 1 + + msg = "Agent updated successfully." + if tables_updated: + msg += f" {tables_updated} table(s) renamed." + if columns_updated: + msg += f" {columns_updated} column(s) renamed." + return {"message": msg, "agent_id": agent_id} @classmethod def delete_agent(