-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdfg_builder.py
More file actions
83 lines (72 loc) · 3.07 KB
/
Copy pathdfg_builder.py
File metadata and controls
83 lines (72 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""Lightweight DFG builder for simple same-function define-use matching."""
from __future__ import annotations
from typing import Any
try:
from code_graph_demo.graph_schema import Edge, Node
except ImportError:
from graph_schema import Edge, Node
def _event_id(owner_id: str, event: dict[str, Any], index: int) -> str:
statement_path = str(event.get("statement_path", event.get("statement_index", ""))).replace(".", "_")
return (
f"var:{owner_id}:{event.get('kind')}:{event.get('name')}:"
f"{event.get('lineno')}:{statement_path}:{index}"
)
def _add_dfg_for_callable(
owner_id: str,
owner_path: str,
events: list[dict[str, Any]],
nodes: list[Node],
edges: list[Edge],
) -> None:
latest_defs: dict[str, list[str]] = {}
for index, event in enumerate(events):
name = event.get("name", "")
kind = event.get("kind")
context = str(event.get("context", "normal"))
node_id = _event_id(owner_id, event, index)
node_type = "VariableDef" if kind == "def" else "VariableUse"
nodes.append(
Node(
id=node_id,
type=node_type,
name=name,
path=owner_path,
lineno=event.get("lineno"),
metadata={
"owner": owner_id,
"statement_index": event.get("statement_index"),
"statement_path": event.get("statement_path"),
"scope": owner_id,
"context": context,
},
)
)
if kind == "def":
edges.append(Edge(source=owner_id, target=node_id, type="DEFINES"))
if context in {"branch", "except"} or context.endswith("_except"):
latest_defs.setdefault(name, [])
if node_id not in latest_defs[name]:
latest_defs[name].append(node_id)
else:
latest_defs[name] = [node_id]
elif kind == "use":
edges.append(Edge(source=owner_id, target=node_id, type="USES"))
for def_id in latest_defs.get(name, []):
edges.append(Edge(source=def_id, target=node_id, type="DATA_FLOW"))
def build_dfg(ast_infos: list[dict[str, Any]]) -> dict[str, list[Any]]:
"""Build DFG nodes and edges from AST summaries."""
nodes: list[Node] = []
edges: list[Edge] = []
for info in ast_infos:
if info.get("error"):
continue
path = info.get("file", {}).get("path", "")
for function in info.get("functions", []):
owner_id = f"function:{path}:{function.get('name', '')}"
_add_dfg_for_callable(owner_id, path, function.get("data_flow", []), nodes, edges)
for cls in info.get("classes", []):
class_name = cls.get("name", "")
for method in cls.get("methods", []):
owner_id = f"method:{path}:{class_name}.{method.get('name', '')}"
_add_dfg_for_callable(owner_id, path, method.get("data_flow", []), nodes, edges)
return {"nodes": nodes, "edges": edges}