-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdeploy_master_system.py
More file actions
137 lines (119 loc) · 4.42 KB
/
deploy_master_system.py
File metadata and controls
137 lines (119 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import requests
import json
import time
import glob
import os
ZEEBE_BASE = "http://localhost:8081/v2"
def deploy_all():
print("--- Deploying Full System ---")
url = f"{ZEEBE_BASE}/deployments"
files = []
# 1. Master BPMN (Updated to use master_eligibility.bpmn)
bpmn_path = "master_eligibility.bpmn"
if os.path.exists(bpmn_path):
files.append(('resources', (bpmn_path, open(bpmn_path, 'rb'), 'application/xml')))
else:
print(f"Error: {bpmn_path} not found.")
return False
# 1.5 Forms
form_files = glob.glob("*.form")
for f in form_files:
print(f"Found form: {f}")
files.append(('resources', (f, open(f, 'rb'), 'application/json')))
# 2. All DMNs
dmn_files = glob.glob("dmn_output/*.dmn")
print(f"Found {len(dmn_files)} DMN files.")
# We might need to batch this if too large
# Let's try sending all? Or batches of 10.
# Zeebe might have a limit.
# Let's do batches of 20
# Close BPMN file after reading? Or keep open?
# Actually requests reads them.
# Let's map DMN files to tuples
dmn_tuples = []
for f in dmn_files:
dmn_tuples.append(('resources', (os.path.basename(f), open(f, 'rb'), 'application/xml')))
# Deploy BPMN + First 10 DMNs
batch_size = 20
current_batch = [files[0]] # Always include BPMN? No, BPMN only needs to be deployed once.
# Just deploy BPMN + Forms first
try:
print(f"Deploying {len(files)} process resources (BPMN + Forms)...")
# We need to ensure we pass the list correctly to requests
# requests.post takes 'files' as a list of tuples
resp = requests.post(url, files=files)
if resp.status_code != 200:
print(f"BPMN/Form Deployment failed: {resp.text}")
return False
else:
print("BPMN/Form Deployment success.")
print(json.dumps(resp.json(), indent=2))
except Exception as e:
print(f"Failed to deploy BPMN: {e}")
return False
# Deploy DMNs in batches
for i in range(0, len(dmn_tuples), batch_size):
batch = dmn_tuples[i:i+batch_size]
print(f"Deploying batch {i} to {i+len(batch)}...")
try:
resp = requests.post(url, files=batch)
if resp.status_code != 200:
print(f"Batch failed: {resp.text}")
else:
print("Batch success.")
except Exception as e:
print(f"Batch error: {e}")
# Cleanup
for f_tuple in files:
# f_tuple is ('resources', (filename, filehandle, mimetype))
f_tuple[1][1].close()
for item in dmn_tuples:
item[1][1].close()
return True
def start_master_process():
print("\n--- Starting Master Process ---")
url = f"{ZEEBE_BASE}/process-instances"
# Get list of Rule IDs from filenames
# S2R001.dmn -> S2R001
import glob
rule_ids = [os.path.basename(f).replace(".dmn", "") for f in glob.glob("dmn_output/*.dmn")]
# Rule IDs in DMN are usually just the number part as 'item'?
# In BPMN we used: decisionId="Decision_" + item
# In DMN file: decision id="Decision_S2R001"
# So if item="S2R001", then "Decision_S2R001" matches.
payload = {
"processDefinitionId": "Process_Master_Eligibility",
"variables": {
"ruleList": rule_ids,
# Test Data: A family of 3 with moderate income
"household": {
"city": "NYC",
"members": 3
},
"person": {
"age": 30,
"disabled": False,
"blind": False
},
"income": 2000.0, # Monthly -> Yearly 24000
"expenseAmount": 100.0,
"expenseType": "ChildCare"
}
}
try:
response = requests.post(url, json=payload)
if response.status_code == 200:
key = response.json().get('processInstanceKey')
print(f"Started Master Instance: {key}")
return key
else:
print(f"Start failed: {response.text}")
return None
except Exception as e:
print(f"Error: {e}")
return None
if __name__ == "__main__":
if deploy_all():
start_master_process()
print("\nNOTE: Ensure 'python aggregation_worker.py' is running in another terminal!")
print("Then check Tasklist: http://localhost:8083 (demo/demo)")