-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathGraph.py
More file actions
500 lines (430 loc) · 21.3 KB
/
Graph.py
File metadata and controls
500 lines (430 loc) · 21.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
import os
import networkx as nx
from Nodes import find_nodes, create_secondary_nodes
import plotly.graph_objects as go
import math
import re
def create_network_graph(snapshot_folder_name, include_classes=None, include_programs=None, name_contains=None, highlight_json_contains=None, highlight_content_contains=None, border_override=None, include_secondary=True):
"""Create a network graph from the JSON configuration files, grouped by program.
include_classes: optional list of class/type names to include. If None, include all.
include_programs: optional list of program names to include. If None, include all.
name_contains: optional substring filter (case-insensitive) on node name. If provided, only nodes whose
internal name contains this text will be included.
highlight_json_contains: optional regex pattern (case-insensitive). If provided, nodes with JSON available
will be highlighted green if they match, red if they don't. Nodes without JSON are black.
highlight_content_contains: optional regex pattern (case-insensitive). If provided, nodes with extracted content
will be highlighted green if they match, red if they don't. Nodes without content are black.
border_override: optional dict mapping node name -> one of {'same','distinct','changed'} to control border colors for compare views.
include_secondary: when True (default) include implied secondary nodes; when False, show only primary nodes.
"""
snapshot_folder = os.path.join("Snapshots", snapshot_folder_name)
# Debug: Check if snapshot folder exists
if not os.path.exists(snapshot_folder):
print(f"Error: Snapshot folder '{snapshot_folder}' does not exist")
return None
json_files = [f for f in os.listdir(snapshot_folder) if f.endswith('.json')]
# Debug: Check if JSON files were found
if not json_files:
print(f"Error: No JSON files found in '{snapshot_folder}'")
return None
nodes = find_nodes(json_files, snapshot_folder)
# Debug: Check if nodes were created
if not nodes:
print("Error: No nodes found")
return None
print(f"Found {len(nodes)} initial nodes")
if include_secondary:
nodes = create_secondary_nodes(nodes)
print(f"Total nodes after secondary creation: {len(nodes)}")
else:
print("Skipping secondary node creation for this graph (primary-only)")
# Filter by include_classes if provided
if include_classes is not None:
class_set = set(include_classes)
nodes = [n for n in nodes if n.get('class') in class_set]
print(f"Nodes after class filter ({len(class_set)} selected): {len(nodes)}")
# Filter by include_programs if provided and not empty
if include_programs is not None and len(include_programs) > 0:
prog_set = set(include_programs)
nodes = [n for n in nodes if n.get('program') in prog_set]
print(f"Nodes after program filter ({len(prog_set)} selected): {len(nodes)}")
# Filter by name_contains if provided (case-insensitive substring on node 'name')
if name_contains:
term = name_contains.lower()
nodes = [n for n in nodes if isinstance(n.get('name'), str) and term in n.get('name').lower()]
print(f"Nodes after name filter ('{name_contains}') : {len(nodes)}")
# Early return with empty figure if no nodes to render
if not nodes:
return go.Figure(data=[], layout=go.Layout(
title=f'Configuration Network for {snapshot_folder_name} (No elements to display)',
showlegend=False,
hovermode='closest',
margin=dict(b=20, l=5, r=5, t=40),
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
plot_bgcolor='rgba(240,240,240,0.8)',
autosize=True,
height=800
))
# Compile regex patterns (case-insensitive); handle invalid patterns gracefully
def compile_pattern(pat):
if not pat:
return None
try:
return re.compile(pat, re.IGNORECASE)
except re.error as e:
print(f"Invalid regex pattern '{pat}': {e}")
return None
json_pat = compile_pattern(highlight_json_contains)
content_pat = compile_pattern(highlight_content_contains)
# Create a NetworkX graph
G = nx.DiGraph()
# Add nodes to the graph (include content/json for highlighting)
for node in nodes:
G.add_node(node['name'],
display_name=node['display_name'],
type=node['class'],
query=node['query'],
name=node['display_name'],
program=node['program'],
order=node.get('order', 'primary'),
json_str=node.get('json'),
content=node.get('content'))
# Add edges based on connections
for node in nodes:
source_node = node['name']
connections = node.get('connections', [])
for connection in connections:
if connection in G.nodes():
G.add_edge(source_node, connection,
relation="references",
edge_type="configuration_reference")
print(f"Created graph with {G.number_of_nodes()} nodes and {G.number_of_edges()} edges")
# Group nodes by program
program_nodes = {}
for node in G.nodes():
program = G.nodes[node]['program']
if program not in program_nodes:
program_nodes[program] = []
program_nodes[program].append(node)
# Create clustered positions
pos = {}
programs = list(program_nodes.keys())
num_programs = len(programs)
# Calculate cluster centers in a grid layout
grid_size = math.ceil(math.sqrt(num_programs)) if num_programs else 1
cluster_spacing = 8 # Distance between cluster centers
# Store cluster centers for labels
cluster_centers = {}
for i, program in enumerate(programs):
# Calculate cluster center position
row = i // grid_size
col = i % grid_size
center_x = col * cluster_spacing
center_y = row * cluster_spacing
# Store cluster center for label placement
cluster_centers[program] = (center_x, center_y)
# Get nodes for this program
program_node_list = program_nodes[program]
if len(program_node_list) == 1:
# Single node, place at center
pos[program_node_list[0]] = (center_x, center_y)
else:
# Multiple nodes, create subgraph and layout within cluster
program_subgraph = G.subgraph(program_node_list)
# Use spring layout for nodes within the cluster
cluster_pos = nx.spring_layout(program_subgraph, k=1, iterations=50, scale=2)
# Offset all positions to the cluster center
for node in program_node_list:
if node in cluster_pos:
pos[node] = (cluster_pos[node][0] + center_x,
cluster_pos[node][1] + center_y)
# Define colors for different node types
class_colors = {
'MessageConfig': '#1f77b4',
'ClientTopic': '#ff7f0e',
'StandaloneFormula': '#2ca02c',
'ClientPageLayout': '#d62728',
'CustomFieldDef': '#bcbd22',
'ClientProgram': '#17becf',
'MessageCategory': '#9467bd',
'Incentive': '#8c564b',
'ClientRaffle': '#e377c2',
'ClientReward': '#7f7f7f',
'ClientTaskHandlerDefinition': '#ff9896',
'Rule': '#e377c2',
'RuleSet': '#7f7f7f',
'default': '#17becf'
}
# Prepare edge traces
edge_x = []
edge_y = []
for edge in G.edges():
if edge[0] in pos and edge[1] in pos:
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_x.extend([x0, x1, None])
edge_y.extend([y0, y1, None])
edge_trace = go.Scatter(x=edge_x, y=edge_y,
line=dict(width=0.5, color='#888'),
hoverinfo='none',
mode='lines')
# Create program label traces
label_traces = []
even = True
for program, (center_x, center_y) in cluster_centers.items():
# Calculate actual cluster bounds for better label positioning
if program in program_nodes:
program_x = [pos[node][0] for node in program_nodes[program] if node in pos]
program_y = [pos[node][1] for node in program_nodes[program] if node in pos]
if program_x and program_y:
# Position label at top/bottom of cluster alternately
label_x = center_x
if even:
label_y = max(program_y) + 1 # Position above the cluster
even = False
else:
label_y = min(program_y) - 1 # Position below the cluster
even = True
label_trace = go.Scatter(
x=[label_x],
y=[label_y],
mode='text',
text=[program.replace('ClientProgram:', '')],
textfont=dict(size=10, color='rgba(128, 128, 128, 0.8)'),
textposition='middle center',
showlegend=False,
hoverinfo='none'
)
label_traces.append(label_trace)
# Helper to stringify content for content regex
def stringify_content(val):
if val is None:
return None
try:
if isinstance(val, (list, tuple)):
parts = []
for v in val:
if v is None:
continue
s = str(v).strip()
if s:
parts.append(s)
return "\n".join(parts) if parts else None
s = str(val).strip()
return s if s else None
except Exception:
return None
# Decide node grouping for highlight borders
compare_mode = isinstance(border_override, dict) and len(border_override) > 0
highlight_active = bool((json_pat or content_pat) and not compare_mode)
# When compare_mode, create fixed groups by override
if compare_mode:
compare_groups = {
'same': {'x': [], 'y': [], 'text': [], 'customdata': [], 'size': [], 'color': []},
'changed': {'x': [], 'y': [], 'text': [], 'customdata': [], 'size': [], 'color': []},
'distinct': {'x': [], 'y': [], 'text': [], 'customdata': [], 'size': [], 'color': []}
}
for node in G.nodes():
if node in pos:
x, y = pos[node]
node_info = G.nodes[node]
if node_info['order'] == "secondary":
node_display = " **"
else:
node_display = ""
context_hover_text = (f"<b>Name:</b> {node_info['display_name']}{node_display}<br>"
f"<b>Type:</b> {node_info['type']}<br>"
f"<b>Program:</b> {node_info['program']}<br>")
if node_info['order'] == "secondary":
refers_to_hover_text = ""
else:
refers_to_hover_text = f"<b>Refers To ({len(list(G.successors(node)))}):</b><br>"
for successor in list(G.successors(node)):
refers_to_hover_text += f" {successor}<br>"
if node_info['query'] == "True":
refers_to_hover_text += " <b>Query<br>"
referred_by_hover_text = f"<b>Referred By* ({len(list(G.predecessors(node)))}):</b><br>"
for predecessor in list(G.predecessors(node)):
referred_by_hover_text += f" {predecessor}<br>"
if node_info['order'] == "secondary":
warning_hover_text = f"<b>Warning:</b> This is an implied element and could be missing context"
else:
warning_hover_text = ""
hover_text = context_hover_text + refers_to_hover_text + referred_by_hover_text + warning_hover_text
base_color = class_colors.get(node_info['type'], class_colors['default'])
connection_count = len(list(G.successors(node))) + len(list(G.predecessors(node)))
size = max(8, min(20, 10 + connection_count * 2))
label = border_override.get(node, 'same')
bucket = compare_groups.get(label, compare_groups['same'])
bucket['x'].append(x)
bucket['y'].append(y)
bucket['text'].append(hover_text)
bucket['customdata'].append(node)
bucket['size'].append(size)
bucket['color'].append(base_color)
# Build traces per compare bucket with distinct border colors
node_traces = []
mapping = {
'same': '#000000',
'changed': '#f39c12',
'distinct': '#e74c3c'
}
for key in ['same', 'changed', 'distinct']:
grp = compare_groups[key]
if grp['x']:
node_traces.append(go.Scatter(
x=grp['x'], y=grp['y'], mode='markers', hoverinfo='text', text=grp['text'],
customdata=grp['customdata'],
marker=dict(size=grp['size'], color=grp['color'], line=dict(width=3, color=mapping[key]))
))
all_traces = [edge_trace] + label_traces + node_traces
fig = go.Figure(data=all_traces,
layout=go.Layout(
title=f'Configuration Network for {snapshot_folder_name} (Clustered by Program)',
showlegend=False,
hovermode='closest',
margin=dict(b=20, l=5, r=5, t=40),
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
plot_bgcolor='rgba(240,240,240,0.8)',
autosize=True,
height=800))
return fig
# Buckets for nodes by border outcome (highlight mode)
groups = {
'matched': {'x': [], 'y': [], 'text': [], 'customdata': [], 'size': [], 'color': []},
'not_matched': {'x': [], 'y': [], 'text': [], 'customdata': [], 'size': [], 'color': []},
'na': {'x': [], 'y': [], 'text': [], 'customdata': [], 'size': [], 'color': []},
'default': {'x': [], 'y': [], 'text': [], 'customdata': [], 'size': [], 'color': []}
}
# Prepare node traces data
for node in G.nodes():
if node in pos:
x, y = pos[node]
# Create hover text
node_info = G.nodes[node]
if node_info['order'] == "secondary":
node_display = " **"
else:
node_display = ""
context_hover_text = (f"<b>Name:</b> {node_info['display_name']}{node_display}<br>"
f"<b>Type:</b> {node_info['type']}<br>"
f"<b>Program:</b> {node_info['program']}<br>")
if node_info['order'] == "secondary":
refers_to_hover_text = ""
else:
refers_to_hover_text = f"<b>Refers To ({len(list(G.successors(node)))}):</b><br>"
for successor in list(G.successors(node)):
refers_to_hover_text += f" {successor}<br>"
if node_info['query'] == "True":
refers_to_hover_text += " <b>Query<br>"
referred_by_hover_text = f"<b>Referred By* ({len(list(G.predecessors(node)))}):</b><br>"
for predecessor in list(G.predecessors(node)):
referred_by_hover_text += f" {predecessor}<br>"
if node_info['order'] == "secondary":
warning_hover_text = f"<b>Warning:</b> This is an implied element and could be missing context"
else:
warning_hover_text = ""
hover_text = context_hover_text + refers_to_hover_text + referred_by_hover_text + warning_hover_text
# Set color based on type
base_color = class_colors.get(node_info['type'], class_colors['default'])
# Set size based on connections
connection_count = len(list(G.successors(node))) + len(list(G.predecessors(node)))
size = max(8, min(20, 10 + connection_count * 2))
# Determine highlight group
group_key = 'default'
if highlight_active:
json_available = node_info.get('json_str') is not None and str(node_info.get('json_str')).strip() != ""
content_str = stringify_content(node_info.get('content'))
content_available = content_str is not None
applicable_flags = []
match_flags = []
if json_pat is not None:
applicable_flags.append(json_available)
if json_available:
try:
match_flags.append(bool(json_pat.search(str(node_info.get('json_str')))))
except Exception:
match_flags.append(False)
else:
# not applicable
pass
if content_pat is not None:
applicable_flags.append(content_available)
if content_available:
try:
match_flags.append(bool(content_pat.search(content_str)))
except Exception:
match_flags.append(False)
else:
pass
any_applicable = any(applicable_flags) if applicable_flags else False
if not any_applicable:
group_key = 'na'
else:
# If all applicable patterns matched -> matched
if match_flags and all(match_flags):
group_key = 'matched'
else:
group_key = 'not_matched'
# Append to the selected group
target = groups[group_key]
target['x'].append(x)
target['y'].append(y)
target['text'].append(hover_text)
target['customdata'].append(node)
target['size'].append(size)
target['color'].append(base_color)
# Build node traces
node_traces = []
if not highlight_active:
# Single default trace with white borders
node_traces.append(go.Scatter(
x=groups['default']['x'], y=groups['default']['y'],
mode='markers', hoverinfo='text', text=groups['default']['text'],
customdata=groups['default']['customdata'],
marker=dict(size=groups['default']['size'], color=groups['default']['color'], line=dict(width=2, color='white'))
))
else:
# Matched (green), Not matched (red), Not applicable (black)
if groups['matched']['x']:
node_traces.append(go.Scatter(
x=groups['matched']['x'], y=groups['matched']['y'],
mode='markers', hoverinfo='text', text=groups['matched']['text'],
customdata=groups['matched']['customdata'],
marker=dict(size=groups['matched']['size'], color=groups['matched']['color'], line=dict(width=3, color='#2ecc71'))
))
if groups['not_matched']['x']:
node_traces.append(go.Scatter(
x=groups['not_matched']['x'], y=groups['not_matched']['y'],
mode='markers', hoverinfo='text', text=groups['not_matched']['text'],
customdata=groups['not_matched']['customdata'],
marker=dict(size=groups['not_matched']['size'], color=groups['not_matched']['color'], line=dict(width=3, color='#e74c3c'))
))
if groups['na']['x']:
node_traces.append(go.Scatter(
x=groups['na']['x'], y=groups['na']['y'],
mode='markers', hoverinfo='text', text=groups['na']['text'],
customdata=groups['na']['customdata'],
marker=dict(size=groups['na']['size'], color=groups['na']['color'], line=dict(width=3, color='#000000'))
))
# Combine all traces
all_traces = [edge_trace] + label_traces + node_traces
# Create the figure
fig = go.Figure(data=all_traces,
layout=go.Layout(
title=f'Configuration Network for {snapshot_folder_name} (Clustered by Program)',
showlegend=False,
hovermode='closest',
margin=dict(b=20, l=5, r=5, t=40),
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
plot_bgcolor='rgba(240,240,240,0.8)',
autosize=True,
height=800))
return fig
if __name__ == "__main__":
snapshot_name = "NDP2" # Replace with your snapshot name
graph = create_network_graph(snapshot_name)
print(graph) # This will print the graph object, you can visualize it using Plotly or NetworkX