-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathorchestrator.py
More file actions
114 lines (92 loc) · 3.61 KB
/
orchestrator.py
File metadata and controls
114 lines (92 loc) · 3.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""
orchestrator.py
----------------
Central orchestration engine for IAM Automation Platform.
Coordinates:
- Workday feed events
- SCIM provisioning
- Microsoft Graph automation
- Logging and error handling
This module acts as the "brain" of the system.
"""
from logger import log
from hr.workday_events import WorkdayEventParser
from hr.provision_new_hire import process_new_hire
from hr.provision_update import process_update
from hr.provision_termination import process_termination
from graph_users import get_user
from utils import load_json_file
class IAMOrchestrator:
def __init__(self, config_path="config.json"):
log.info("Initializing IAM Orchestrator")
self.config = load_json_file(config_path)
self.parser = WorkdayEventParser()
# ---------------------------------------------------------
# HIGH-LEVEL ENTRYPOINTS
# ---------------------------------------------------------
def handle_workday_event(self, event: dict):
"""
Main entrypoint for Workday -> IAM automation.
"""
log.info(f"Received Workday event: {event}")
event_type = self.parser.get_event_type(event)
if event_type == "new_hire":
return self._handle_new_hire(event)
elif event_type == "update":
return self._handle_update(event)
elif event_type == "termination":
return self._handle_termination(event)
else:
log.error(f"Unknown Workday event type: {event_type}")
return {"status": "error", "message": "Unknown event type"}
# ---------------------------------------------------------
# EVENT HANDLERS
# ---------------------------------------------------------
def _handle_new_hire(self, event):
log.info("Processing NEW HIRE event")
result = process_new_hire(event)
if not result.get("success"):
return {"status": "error", "message": result.get("error", "New hire provisioning failed")}
return {
"status": "success",
"message": "New hire provisioned successfully",
"user_id": result.get("entra_user_id")
}
def _handle_update(self, event):
log.info("Processing UPDATE event")
user_id = event.get("employeeId")
entra_user = get_user(user_id)
result = process_update(event, entra_user)
if not result.get("success"):
return {"status": "error", "message": result.get("error", "User update failed")}
return {
"status": "success",
"message": "User updated successfully",
"user_id": user_id
}
def _handle_termination(self, event):
log.info("Processing TERMINATION event")
result = process_termination(event)
if not result.get("success"):
return {"status": "error", "message": result.get("error", "Termination failed")}
return {
"status": "success",
"message": "User terminated successfully",
"user_id": event.get("employeeId")
}
# ---------------------------------------------------------
# BULK PROCESSING (CSV)
# ---------------------------------------------------------
def process_csv(self, csv_path: str):
"""
Allows bulk provisioning via CSV upload.
"""
import csv
results = []
with open(csv_path, "r") as f:
reader = csv.DictReader(f)
for row in reader:
event = self.parser.from_csv_row(row)
result = self.handle_workday_event(event)
results.append(result)
return results