Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 59 additions & 7 deletions api/app/lib/graph_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ def find_paths(
if all(r.get('label', '') in type_set
for r in p.get('path_rels', []))
]
paths = paths[:max_paths]
logger.debug(f"find_paths: graph_accel returned {len(paths)} paths")
return paths

Expand Down Expand Up @@ -395,16 +396,19 @@ def _find_path_accel(

return self._accel_path_rows_to_dict(rows)

def _accel_path_rows_to_dict(self, rows: List[Dict]) -> Dict[str, Any]:
def _accel_path_rows_to_dict(self, rows: List[Dict]) -> Optional[Dict[str, Any]]:
"""Convert graph_accel_path rows to the path dict format.

graph_accel returns: step, app_id, label, rel_type, direction
We need: path_nodes [{concept_id, label, description}],
path_rels [{label, properties}], hops

Nodes without app_id (Source, Instance) use their AGE vertex label
as a fallback display name.
Returns None if the path traverses non-Concept nodes (phantom
references from dangling edges to Source/Instance/Ontology).
"""
if any(not row.get('app_id') for row in rows):
return None

node_ids = [row['app_id'] for row in rows]
path_rels = []

Expand All @@ -427,7 +431,6 @@ def _accel_path_rows_to_dict(self, rows: List[Dict]) -> Dict[str, Any]:
if data:
path_nodes.append(data)
else:
# Non-Concept node (Source, Instance) — use AGE label
path_nodes.append({
"concept_id": nid or '',
"label": row.get('label', '') or '',
Expand Down Expand Up @@ -473,6 +476,13 @@ def _find_paths_accel(
result = []
for pi in sorted(paths_by_index.keys()):
path_rows = paths_by_index[pi]

# Skip paths containing non-Concept nodes (phantom references
# from dangling edges to Source/Instance/Ontology nodes that
# aren't loaded in the in-memory graph).
if any(not row.get('app_id') for row in path_rows):
continue

path_nodes = []
path_rels = []

Expand All @@ -482,7 +492,6 @@ def _find_paths_accel(
if data:
path_nodes.append(data)
else:
# Non-Concept node (Source, Instance) — use AGE label
path_nodes.append({
"concept_id": nid or '',
"label": row.get('label', '') or '',
Expand Down Expand Up @@ -982,13 +991,50 @@ def _get_accel_connection(self):
)
_accel_conn.autocommit = True

# Set GUCs once for the connection lifetime
with _accel_conn.cursor() as cur:
cur.execute("SET graph_accel.node_id_property = 'concept_id'")

logger.info("graph_accel: created dedicated connection")
return _accel_conn

# Provenance/bookkeeping edge types that connect Concepts to
# infrastructure nodes (Source, Instance, Ontology). Loading these
# creates phantom paths through co-occurrence rather than semantics.
_INFRA_EDGE_TYPES = frozenset({
'APPEARS', 'EVIDENCED_BY', 'FROM_SOURCE',
'SCOPED_BY', 'HAS_SOURCE', 'IMAGES',
})

def _set_accel_gucs(self, cur) -> None:
"""Set graph_accel GUCs for semantic-only graph loading.

Called after graph_accel_status() has loaded the shared library
(which registers the GUCs), but before graph_accel_load().
"""
cur.execute("SET graph_accel.node_labels = 'Concept'")
# Build edge type include list by excluding infrastructure types
cur.execute(
"SELECT l.name FROM ag_catalog.ag_label l "
"JOIN ag_catalog.ag_graph g ON l.graph = g.graphid "
"WHERE g.name = 'knowledge_graph' AND l.kind = 'e' "
"AND l.name NOT LIKE '\\_%'" # skip internal _ag_label_edge
)
all_edge_types = {row['name'] for row in cur.fetchall()}
semantic_types = sorted(all_edge_types - self._INFRA_EDGE_TYPES)
if semantic_types:
edge_types_csv = ','.join(semantic_types)
cur.execute(f"SET graph_accel.edge_types = %s", (edge_types_csv,))
else:
logger.warning(
"graph_accel: no semantic edge types found — "
"edge_types GUC not set (defaults to *)"
)
logger.info(
f"graph_accel: GUCs set — node_labels=Concept, "
f"edge_types={len(semantic_types)} semantic / "
f"{len(all_edge_types)} total"
)

def _execute_sql(
self,
query: str,
Expand All @@ -1007,12 +1053,18 @@ def _execute_sql(
conn = self._get_accel_connection()
try:
with conn.cursor(cursor_factory=extras.RealDictCursor) as cur:
# Ensure graph is loaded in this backend
# Ensure graph is loaded in this backend.
# graph_accel_status() triggers library loading, which
# registers GUCs — must call this before setting GUCs.
cur.execute("SELECT status FROM graph_accel_status()")
status_row = cur.fetchone()
backend_status = status_row['status'] if status_row else 'unknown'
if backend_status == 'not_loaded':
logger.info("graph_accel: loading graph...")
# Set GUCs now that the library is loaded and GUCs
# are registered. These filter what gets loaded into
# the in-memory graph.
self._set_accel_gucs(cur)
cur.execute(
"SELECT * FROM graph_accel_load(%s)",
('knowledge_graph',)
Expand Down
46 changes: 46 additions & 0 deletions tests/unit/lib/test_graph_facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,52 @@ def test_accel_multi_path_empty(self, facade):
paths = facade.find_paths('c_a', 'c_z', max_paths=5)
assert paths == []

def test_accel_single_path_filters_phantom_nodes(self, facade):
"""Single path through non-Concept node (no app_id) → returns None."""
facade._accel_available = True

with patch.object(facade, '_execute_sql') as mock_sql, \
patch.object(facade, '_concept_exists', return_value=True), \
patch.object(facade, '_find_path_bfs', return_value=None):
mock_sql.return_value = [
{'step': 0, 'app_id': 'c_a', 'label': 'Concept', 'rel_type': None, 'direction': None},
{'step': 1, 'app_id': None, 'label': '', 'rel_type': 'APPEARS', 'direction': 'outgoing'},
{'step': 2, 'app_id': 'c_b', 'label': 'Concept', 'rel_type': 'APPEARS', 'direction': 'incoming'},
]

result = facade.find_path('c_a', 'c_b')
# Phantom path filtered, falls through to BFS which returns None
assert result is None

def test_accel_multi_path_filters_phantom_paths(self, facade):
"""Paths through non-Concept nodes are excluded from multi-path results."""
facade._accel_available = True

with patch.object(facade, '_execute_sql') as mock_sql, \
patch.object(facade, '_hydrate_concepts') as mock_hydrate:
mock_sql.return_value = [
# Path 0: phantom (goes through Source with no app_id)
{'path_index': 0, 'step': 0, 'app_id': 'c_a', 'label': 'Concept', 'rel_type': None, 'direction': None},
{'path_index': 0, 'step': 1, 'app_id': None, 'label': '', 'rel_type': 'APPEARS', 'direction': 'outgoing'},
{'path_index': 0, 'step': 2, 'app_id': 'c_b', 'label': 'Concept', 'rel_type': 'APPEARS', 'direction': 'incoming'},
# Path 1: clean semantic path
{'path_index': 1, 'step': 0, 'app_id': 'c_a', 'label': 'Concept', 'rel_type': None, 'direction': None},
{'path_index': 1, 'step': 1, 'app_id': 'c_c', 'label': 'Concept', 'rel_type': 'CONTAINS', 'direction': 'outgoing'},
{'path_index': 1, 'step': 2, 'app_id': 'c_b', 'label': 'Concept', 'rel_type': 'CONTAINS', 'direction': 'outgoing'},
]
mock_hydrate.return_value = {
'c_a': {'concept_id': 'c_a', 'label': 'A', 'description': ''},
'c_b': {'concept_id': 'c_b', 'label': 'B', 'description': ''},
'c_c': {'concept_id': 'c_c', 'label': 'C', 'description': ''},
}

paths = facade.find_paths('c_a', 'c_b', max_paths=5)

# Only the clean path survives
assert len(paths) == 1
assert paths[0]['hops'] == 2
assert paths[0]['path_nodes'][1]['concept_id'] == 'c_c'

def test_same_node_returns_self_path(self, facade):
"""from_id == to_id returns single-node path."""
with patch.object(facade, '_hydrate_concepts') as mock_hydrate:
Expand Down