diff --git a/api/app/lib/graph_facade.py b/api/app/lib/graph_facade.py index 89b77039..b9e54c57 100644 --- a/api/app/lib/graph_facade.py +++ b/api/app/lib/graph_facade.py @@ -344,6 +344,7 @@ def find_paths( if all(r.get('label', '') in type_set for r in p.get('path_rels', [])) ] + paths = paths[:max_paths] logger.debug(f"find_paths: graph_accel returned {len(paths)} paths") return paths @@ -395,16 +396,19 @@ def _find_path_accel( return self._accel_path_rows_to_dict(rows) - def _accel_path_rows_to_dict(self, rows: List[Dict]) -> Dict[str, Any]: + def _accel_path_rows_to_dict(self, rows: List[Dict]) -> Optional[Dict[str, Any]]: """Convert graph_accel_path rows to the path dict format. graph_accel returns: step, app_id, label, rel_type, direction We need: path_nodes [{concept_id, label, description}], path_rels [{label, properties}], hops - Nodes without app_id (Source, Instance) use their AGE vertex label - as a fallback display name. + Returns None if the path traverses non-Concept nodes (phantom + references from dangling edges to Source/Instance/Ontology). """ + if any(not row.get('app_id') for row in rows): + return None + node_ids = [row['app_id'] for row in rows] path_rels = [] @@ -427,7 +431,6 @@ def _accel_path_rows_to_dict(self, rows: List[Dict]) -> Dict[str, Any]: if data: path_nodes.append(data) else: - # Non-Concept node (Source, Instance) — use AGE label path_nodes.append({ "concept_id": nid or '', "label": row.get('label', '') or '', @@ -473,6 +476,13 @@ def _find_paths_accel( result = [] for pi in sorted(paths_by_index.keys()): path_rows = paths_by_index[pi] + + # Skip paths containing non-Concept nodes (phantom references + # from dangling edges to Source/Instance/Ontology nodes that + # aren't loaded in the in-memory graph). + if any(not row.get('app_id') for row in path_rows): + continue + path_nodes = [] path_rels = [] @@ -482,7 +492,6 @@ def _find_paths_accel( if data: path_nodes.append(data) else: - # Non-Concept node (Source, Instance) — use AGE label path_nodes.append({ "concept_id": nid or '', "label": row.get('label', '') or '', @@ -982,13 +991,50 @@ def _get_accel_connection(self): ) _accel_conn.autocommit = True - # Set GUCs once for the connection lifetime with _accel_conn.cursor() as cur: cur.execute("SET graph_accel.node_id_property = 'concept_id'") logger.info("graph_accel: created dedicated connection") return _accel_conn + # Provenance/bookkeeping edge types that connect Concepts to + # infrastructure nodes (Source, Instance, Ontology). Loading these + # creates phantom paths through co-occurrence rather than semantics. + _INFRA_EDGE_TYPES = frozenset({ + 'APPEARS', 'EVIDENCED_BY', 'FROM_SOURCE', + 'SCOPED_BY', 'HAS_SOURCE', 'IMAGES', + }) + + def _set_accel_gucs(self, cur) -> None: + """Set graph_accel GUCs for semantic-only graph loading. + + Called after graph_accel_status() has loaded the shared library + (which registers the GUCs), but before graph_accel_load(). + """ + cur.execute("SET graph_accel.node_labels = 'Concept'") + # Build edge type include list by excluding infrastructure types + cur.execute( + "SELECT l.name FROM ag_catalog.ag_label l " + "JOIN ag_catalog.ag_graph g ON l.graph = g.graphid " + "WHERE g.name = 'knowledge_graph' AND l.kind = 'e' " + "AND l.name NOT LIKE '\\_%'" # skip internal _ag_label_edge + ) + all_edge_types = {row['name'] for row in cur.fetchall()} + semantic_types = sorted(all_edge_types - self._INFRA_EDGE_TYPES) + if semantic_types: + edge_types_csv = ','.join(semantic_types) + cur.execute(f"SET graph_accel.edge_types = %s", (edge_types_csv,)) + else: + logger.warning( + "graph_accel: no semantic edge types found — " + "edge_types GUC not set (defaults to *)" + ) + logger.info( + f"graph_accel: GUCs set — node_labels=Concept, " + f"edge_types={len(semantic_types)} semantic / " + f"{len(all_edge_types)} total" + ) + def _execute_sql( self, query: str, @@ -1007,12 +1053,18 @@ def _execute_sql( conn = self._get_accel_connection() try: with conn.cursor(cursor_factory=extras.RealDictCursor) as cur: - # Ensure graph is loaded in this backend + # Ensure graph is loaded in this backend. + # graph_accel_status() triggers library loading, which + # registers GUCs — must call this before setting GUCs. cur.execute("SELECT status FROM graph_accel_status()") status_row = cur.fetchone() backend_status = status_row['status'] if status_row else 'unknown' if backend_status == 'not_loaded': logger.info("graph_accel: loading graph...") + # Set GUCs now that the library is loaded and GUCs + # are registered. These filter what gets loaded into + # the in-memory graph. + self._set_accel_gucs(cur) cur.execute( "SELECT * FROM graph_accel_load(%s)", ('knowledge_graph',) diff --git a/tests/unit/lib/test_graph_facade.py b/tests/unit/lib/test_graph_facade.py index 2fc249a7..4b442d8d 100644 --- a/tests/unit/lib/test_graph_facade.py +++ b/tests/unit/lib/test_graph_facade.py @@ -327,6 +327,52 @@ def test_accel_multi_path_empty(self, facade): paths = facade.find_paths('c_a', 'c_z', max_paths=5) assert paths == [] + def test_accel_single_path_filters_phantom_nodes(self, facade): + """Single path through non-Concept node (no app_id) → returns None.""" + facade._accel_available = True + + with patch.object(facade, '_execute_sql') as mock_sql, \ + patch.object(facade, '_concept_exists', return_value=True), \ + patch.object(facade, '_find_path_bfs', return_value=None): + mock_sql.return_value = [ + {'step': 0, 'app_id': 'c_a', 'label': 'Concept', 'rel_type': None, 'direction': None}, + {'step': 1, 'app_id': None, 'label': '', 'rel_type': 'APPEARS', 'direction': 'outgoing'}, + {'step': 2, 'app_id': 'c_b', 'label': 'Concept', 'rel_type': 'APPEARS', 'direction': 'incoming'}, + ] + + result = facade.find_path('c_a', 'c_b') + # Phantom path filtered, falls through to BFS which returns None + assert result is None + + def test_accel_multi_path_filters_phantom_paths(self, facade): + """Paths through non-Concept nodes are excluded from multi-path results.""" + facade._accel_available = True + + with patch.object(facade, '_execute_sql') as mock_sql, \ + patch.object(facade, '_hydrate_concepts') as mock_hydrate: + mock_sql.return_value = [ + # Path 0: phantom (goes through Source with no app_id) + {'path_index': 0, 'step': 0, 'app_id': 'c_a', 'label': 'Concept', 'rel_type': None, 'direction': None}, + {'path_index': 0, 'step': 1, 'app_id': None, 'label': '', 'rel_type': 'APPEARS', 'direction': 'outgoing'}, + {'path_index': 0, 'step': 2, 'app_id': 'c_b', 'label': 'Concept', 'rel_type': 'APPEARS', 'direction': 'incoming'}, + # Path 1: clean semantic path + {'path_index': 1, 'step': 0, 'app_id': 'c_a', 'label': 'Concept', 'rel_type': None, 'direction': None}, + {'path_index': 1, 'step': 1, 'app_id': 'c_c', 'label': 'Concept', 'rel_type': 'CONTAINS', 'direction': 'outgoing'}, + {'path_index': 1, 'step': 2, 'app_id': 'c_b', 'label': 'Concept', 'rel_type': 'CONTAINS', 'direction': 'outgoing'}, + ] + mock_hydrate.return_value = { + 'c_a': {'concept_id': 'c_a', 'label': 'A', 'description': ''}, + 'c_b': {'concept_id': 'c_b', 'label': 'B', 'description': ''}, + 'c_c': {'concept_id': 'c_c', 'label': 'C', 'description': ''}, + } + + paths = facade.find_paths('c_a', 'c_b', max_paths=5) + + # Only the clean path survives + assert len(paths) == 1 + assert paths[0]['hops'] == 2 + assert paths[0]['path_nodes'][1]['concept_id'] == 'c_c' + def test_same_node_returns_self_path(self, facade): """from_id == to_id returns single-node path.""" with patch.object(facade, '_hydrate_concepts') as mock_hydrate: