-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmcp_integration.py
More file actions
304 lines (246 loc) · 9.79 KB
/
mcp_integration.py
File metadata and controls
304 lines (246 loc) · 9.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""
IBA + Anthropic MCP Integration Example
This example shows how to integrate Intent-Bound Authorization with
Anthropic's Model Context Protocol (MCP) to create purpose-aware AI agents.
The integration provides:
1. Intent validation before tool execution
2. Automatic drift detection
3. Comprehensive audit logging
4. Real-time violation blocking
Author: Grokipaedia Research
License: MIT
"""
import sys
sys.path.insert(0, '..')
from iba import (
IntentDeclaration,
IntentScope,
IntentValidator,
IntentViolationError
)
from typing import Any, Dict, Optional
from datetime import datetime
import json
class IBAMCPServer:
"""
An MCP Server with Intent-Bound Authorization.
This wraps the standard MCP Server to add intent validation
before every tool call.
"""
def __init__(self, intent: IntentDeclaration, verbose: bool = True):
"""
Initialize IBA-enabled MCP server.
Args:
intent: The IntentDeclaration defining what this agent can do
verbose: Whether to print validation results
"""
self.intent = intent
self.validator = IntentValidator(intent)
self.verbose = verbose
self.tools: Dict[str, Any] = {}
if verbose:
print(f"🔐 IBA MCP Server initialized")
print(f"📋 Purpose: {intent.declared_purpose}")
print(f"👤 Authorized by: {intent.authorized_by}")
print(f"⏰ Valid until: {intent.expiration.strftime('%Y-%m-%d %H:%M:%S UTC')}")
print()
def register_tool(self, name: str, func: Any, resource: str):
"""
Register a tool with its associated resource.
Args:
name: Tool name
func: Tool function
resource: Resource identifier (e.g., "calendar:write")
"""
self.tools[name] = {
'function': func,
'resource': resource
}
if self.verbose:
print(f"🔧 Registered tool: {name} → {resource}")
def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Call a tool with intent validation.
This is the core integration point. Every tool call goes through
intent validation before execution.
"""
if name not in self.tools:
return {
'success': False,
'error': f"Unknown tool: {name}"
}
tool = self.tools[name]
resource = tool['resource']
# PRE-EXECUTION GATE: Validate against intent
validation_result = self.validator.validate_action(name, resource)
if not validation_result['allowed']:
# Intent violation detected - block execution
if self.verbose:
print(f"\n❌ BLOCKED: {name}")
print(f" Resource: {resource}")
print(f" Reason: {validation_result['reason']}")
print()
# Check for drift
drift = self.validator.detect_drift()
if drift['drift_detected']:
if self.verbose:
print(f"⚠️ DRIFT DETECTED: {drift['reason']}")
print(f" Intent may be compromised")
print()
return {
'success': False,
'error': validation_result['reason'],
'intent_violation': True,
'drift_detected': drift['drift_detected']
}
# Validation passed - execute tool
if self.verbose:
print(f"✅ ALLOWED: {name}")
print(f" Resource: {resource}")
print(f" Purpose alignment: Verified")
print()
try:
result = tool['function'](**arguments)
# POST-EXECUTION: Log success
return {
'success': True,
'result': result,
'validated_by_iba': True
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def get_audit_log(self) -> list:
"""Get the complete audit log of all actions."""
return self.validator.get_action_log()
def get_statistics(self) -> Dict[str, Any]:
"""Get statistics about agent behavior."""
return self.validator.get_statistics()
# ============================================================================
# EXAMPLE: Healthcare Appointment Scheduler
# ============================================================================
def search_dentists(location: str) -> list:
"""Simulated dentist search."""
return [
{"name": "Dr. Smith", "address": "123 Main St", "rating": 4.8},
{"name": "Dr. Johnson", "address": "456 Oak Ave", "rating": 4.9}
]
def read_calendar(date_range: str) -> list:
"""Simulated calendar read."""
return [
{"date": "2026-02-05", "time": "14:00", "available": True},
{"date": "2026-02-06", "time": "10:00", "available": True}
]
def create_appointment(dentist: str, date: str, time: str) -> dict:
"""Simulated appointment creation."""
return {
"appointment_id": "apt-001",
"dentist": dentist,
"date": date,
"time": time,
"confirmed": True
}
def access_medical_records(patient_id: str) -> dict:
"""Simulated medical records access - should be BLOCKED."""
return {"patient_id": patient_id, "records": "SENSITIVE DATA"}
def modify_insurance(patient_id: str, plan: str) -> dict:
"""Simulated insurance modification - should be BLOCKED."""
return {"patient_id": patient_id, "plan": plan}
def demo_healthcare_agent():
"""
Demonstrate IBA preventing unauthorized healthcare data access.
This example shows the dentist appointment scenario from the
architecture documentation.
"""
print("=" * 70)
print("DEMO: Healthcare Appointment Scheduler with IBA")
print("=" * 70)
print()
# Define intent scope
scope = IntentScope(
allowed_resources=[
"calendar:read",
"calendar:write",
"healthcare:search",
"booking:create"
],
forbidden_resources=[
"medical_records:*",
"insurance:*",
"payment:modify"
],
resource_limits={
"max_api_calls": 50
}
)
# Create intent declaration
intent = IntentDeclaration(
intent_id="healthcare-001",
declared_purpose="Schedule dentist appointment for next Tuesday",
authorized_by="user@example.com",
scope=scope
)
# Create IBA-enabled MCP server
server = IBAMCPServer(intent, verbose=True)
# Register tools
server.register_tool("search_dentists", search_dentists, "healthcare:search")
server.register_tool("read_calendar", read_calendar, "calendar:read")
server.register_tool("create_appointment", create_appointment, "booking:create")
server.register_tool("access_medical_records", access_medical_records, "medical_records:read")
server.register_tool("modify_insurance", modify_insurance, "insurance:modify")
print("\n" + "─" * 70)
print("SCENARIO 1: Legitimate Actions (Should Succeed)")
print("─" * 70 + "\n")
# Legitimate action 1: Search dentists
result = server.call_tool("search_dentists", {"location": "San Francisco"})
# Legitimate action 2: Read calendar
result = server.call_tool("read_calendar", {"date_range": "2026-02-01 to 2026-02-07"})
# Legitimate action 3: Create appointment
result = server.call_tool("create_appointment", {
"dentist": "Dr. Smith",
"date": "2026-02-05",
"time": "14:00"
})
print("\n" + "─" * 70)
print("SCENARIO 2: Malicious Actions (Should Be Blocked)")
print("─" * 70 + "\n")
# Malicious action 1: Try to access medical records
result = server.call_tool("access_medical_records", {"patient_id": "12345"})
# Malicious action 2: Try to modify insurance
result = server.call_tool("modify_insurance", {"patient_id": "12345", "plan": "premium"})
# Try again - should trigger drift detection
result = server.call_tool("access_medical_records", {"patient_id": "67890"})
result = server.call_tool("modify_insurance", {"patient_id": "67890", "plan": "basic"})
print("\n" + "─" * 70)
print("STATISTICS & AUDIT")
print("─" * 70 + "\n")
# Get statistics
stats = server.get_statistics()
print("📊 Agent Statistics:")
print(f" Total actions: {stats['total_actions']}")
print(f" Allowed: {stats['allowed']}")
print(f" Blocked: {stats['blocked']}")
print(f" Violation rate: {stats['violation_rate']:.1%}")
print()
# Get audit log
print("📋 Audit Log:")
for i, action in enumerate(server.get_audit_log(), 1):
status = "✅" if action['allowed'] else "❌"
print(f" {i}. {status} {action['action']} → {action['resource']}")
if not action['allowed']:
print(f" Reason: {action['reason']}")
print("\n" + "=" * 70)
print("DEMO COMPLETE")
print("=" * 70)
print("\nKey Takeaways:")
print("• Legitimate actions executed successfully")
print("• Malicious actions blocked before execution")
print("• Drift detection triggered after repeated violations")
print("• Complete audit trail maintained")
print("\nThis is IBA in action. Traditional OAuth would have allowed")
print("the medical records access (valid credentials, authorized user).")
print("\nIBA prevented it because it violated the declared purpose.")
if __name__ == "__main__":
demo_healthcare_agent()