-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPyPaC_engine.py
More file actions
115 lines (92 loc) · 4.25 KB
/
PyPaC_engine.py
File metadata and controls
115 lines (92 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# PyPaC_engine.py (WIP/draft) embeds privacy compliance into Python applications
# Not ready for production
import datetime
from typing import Any, Dict, List, Optional, Callable
class PrivacyViolation(Exception):
pass
class AccessLogEntry:
def __init__(self, field: str, actor: str, purpose: str, processor: str):
self.timestamp = datetime.datetime.now()
self.field = field
self.actor = actor
self.purpose = purpose
self.processor = processor
def __repr__(self):
return f"[{self.timestamp}] Field='{self.field}' Actor='{self.actor}' Purpose='{self.purpose}' Processor='{self.processor}'"
class PrivacyField:
def __init__(
self,
value: Any,
purpose: Optional[str] = None,
retention_days: Optional[int] = None,
consent_required: bool = False,
access_roles: Optional[List[str]] = None,
redact_after_expiry: bool = False,
allowed_processors: Optional[List[str]] = None,
mask_fn: Optional[Callable[[Any], str]] = None,
):
self.value = value
self.purpose = purpose
self.retention_days = retention_days
self.consent_required = consent_required
self.access_roles = access_roles or []
self.created_at = datetime.datetime.now()
self.redact_after_expiry = redact_after_expiry
self.allowed_processors = allowed_processors or []
self.mask_fn = mask_fn
def is_expired(self) -> bool:
if self.retention_days is None:
return False
return (datetime.datetime.now() - self.created_at).days > self.retention_days
def get_value(self, role: Optional[str], consent_given: bool, purpose: Optional[str], processor: Optional[str]) -> Any:
if self.allowed_processors and (processor not in self.allowed_processors):
raise PrivacyViolation(f"Processor '{processor}' not allowed for this field")
if self.consent_required and not consent_given:
raise PrivacyViolation("Consent required")
if self.purpose and self.purpose != purpose:
raise PrivacyViolation(f"Purpose mismatch: expected {self.purpose}, got {purpose}")
if role and self.access_roles and role not in self.access_roles:
raise PrivacyViolation(f"Access denied for role: {role}")
if self.is_expired():
if self.redact_after_expiry:
return "[REDACTED]"
raise PrivacyViolation("Data expired")
if role and self.access_roles and self.mask_fn and role not in self.access_roles:
return self.mask_fn(self.value)
return self.value
class PrivacyModel:
def __init__(self, user_id: str, data: Dict[str, PrivacyField]):
self.user_id = user_id
self.data = data
self.access_log: List[AccessLogEntry] = []
self.user_requests: List[str] = []
def get(self, key: str, role: Optional[str] = None, consent_given: bool = False,
purpose: Optional[str] = None, processor: Optional[str] = None, actor: Optional[str] = "unknown"):
field = self.data.get(key)
if not field:
raise KeyError(f"Field '{key}' not found")
value = field.get_value(role, consent_given, purpose, processor)
self.access_log.append(AccessLogEntry(field=key, actor=actor, purpose=purpose or "unspecified", processor=processor or "unspecified"))
return value
def export_user_data(self) -> Dict[str, Any]:
return {
k: (v.value if not v.is_expired() else "[EXPIRED]")
for k, v in self.data.items()
}
def delete_expired_fields(self):
self.data = {
k: v for k, v in self.data.items()
if not v.is_expired() or v.redact_after_expiry
}
def redact_all(self):
for field in self.data.values():
field.value = "[REDACTED]"
def purge_user_data(self):
self.data.clear()
def record_request(self, request_type: str):
timestamp = datetime.datetime.now().isoformat()
self.user_requests.append(f"{timestamp} - {request_type}")
def get_access_log(self) -> List[AccessLogEntry]:
return self.access_log
def get_request_log(self) -> List[str]:
return self.user_requests