-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
351 lines (308 loc) · 12.5 KB
/
auth.py
File metadata and controls
351 lines (308 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
"""Auth + RBAC module — Entra-ready, profile-aware identity extraction.
Pure module modulo `clients.sql_connect()` for the user upsert. Unit-testable
by injecting a fake header dict + a dummy cursor.
Two identity sources, one Principal abstraction:
Azure (Static Web Apps / Front Door / Functions Easy Auth):
HTTP request carries `x-ms-client-principal` — a base64-encoded JSON
document with the user's claims (Object ID, UPN, name, app roles).
Easy Auth validates the JWT before injecting this header; downstream
code can trust it.
Local docker-compose:
No Easy Auth. The frontend sends `X-Reviewer: <upn>`. We trust it (POC
only — never run with public exposure). Lazy-creates a dbo.AppUser row
with AzureOid='local:<upn>' and grants the 'admin' role on first sight
so the developer doesn't fight permissions while iterating.
Role taxonomy (closed set):
reader - read contracts + run queries
reviewer - reader + edit fields + transition to in_review / needs_review
approver - reviewer + transition to approved/rejected; manage flags
admin - approver + manage users / role assignments
Role hierarchy: a higher role implies all lower roles via _ROLE_HIERARCHY,
so `principal.has_role('reviewer')` returns True for an approver / admin too.
"""
from __future__ import annotations
import base64
import json
import logging
import uuid
from dataclasses import dataclass, field
from typing import Any, Mapping
from shared import clients
LOG = logging.getLogger(__name__)
# Closed set of roles, lowest-privilege first. Rank is used to compare —
# index in the list is the rank.
ROLES = ("reader", "reviewer", "approver", "admin")
# A higher role implies all lower roles. has_role('reviewer') matches anyone
# whose max-rank role is >= reviewer.
_ROLE_RANK: dict[str, int] = {r: i for i, r in enumerate(ROLES)}
# Entra App Role claim values (configured in the Entra app registration
# manifest) → our internal role names. We accept the claim verbatim if it
# matches a known role name; this map is for any future renames.
_ENTRA_ROLE_MAP: dict[str, str] = {
"Reader": "reader",
"Reviewer": "reviewer",
"Approver": "approver",
"Admin": "admin",
}
# Easy Auth claim type URIs. Different identity providers spell these
# differently; we look at all known forms and take the first match.
_OID_CLAIM_TYPES = (
"http://schemas.microsoft.com/identity/claims/objectidentifier",
"oid",
)
_UPN_CLAIM_TYPES = (
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/upn",
"upn",
"preferred_username",
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress",
)
_NAME_CLAIM_TYPES = (
"name",
"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/name",
)
_ROLE_CLAIM_TYPES = (
"roles",
"http://schemas.microsoft.com/ws/2008/06/identity/claims/role",
)
_TID_CLAIM_TYPES = (
"http://schemas.microsoft.com/identity/claims/tenantid",
"tid",
)
@dataclass(frozen=True)
class Principal:
"""Authenticated user, resolved from request headers + persisted in
dbo.AppUser. Returned by `current_principal(req)`.
`roles` is the union of: (a) Entra App Role claims for this request,
(b) dbo.AppRoleAssignment rows for this user. Locally we seed 'admin'
on first sight; in production Entra is the source of truth.
"""
user_id: str # dbo.AppUser.UserId (our internal UUID)
oid: str # Entra ObjectId, or 'local:<upn>'
upn: str # email-shaped: dave@pattens.org
display_name: str | None
tenant_id: str | None
business_unit: str | None
roles: frozenset[str] = field(default_factory=frozenset)
is_local: bool = False # True when oid starts with 'local:' (no real auth)
def has_role(self, role: str) -> bool:
"""True if any of the principal's roles ranks >= the requested role.
Unknown roles always return False (closed-set safety)."""
target_rank = _ROLE_RANK.get(role)
if target_rank is None:
return False
return any(
_ROLE_RANK.get(r, -1) >= target_rank for r in self.roles
)
class AuthError(Exception):
"""Raised when the request has no usable identity (401) or the
principal lacks a required role (403)."""
def __init__(self, status: int, code: str, message: str) -> None:
super().__init__(message)
self.status = status
self.code = code
self.message = message
def current_principal(headers: Mapping[str, str]) -> Principal:
"""Extract the authenticated principal from request headers, lazy-upsert
the dbo.AppUser row, and return a Principal.
Lookup order:
1. x-ms-client-principal (Easy Auth) — preferred on Azure
2. x-ms-client-principal-name + x-ms-client-principal-id — partial Easy Auth
3. X-Reviewer — local docker-compose only
Raises AuthError(401) if none of the above are present.
"""
# Header lookups are case-insensitive in HTTP; we accept either casing.
def h(name: str) -> str | None:
return headers.get(name) or headers.get(name.lower()) or headers.get(name.upper())
principal_b64 = h("x-ms-client-principal")
if principal_b64:
try:
payload = json.loads(base64.b64decode(principal_b64).decode("utf-8"))
except Exception as e:
LOG.warning("auth.invalid_easy_auth_payload: %s", e)
raise AuthError(401, "invalid_principal", "malformed x-ms-client-principal header")
return _principal_from_easy_auth(payload)
upn_only = h("x-ms-client-principal-name")
oid_only = h("x-ms-client-principal-id")
if upn_only:
# Partial Easy Auth (some Static Web Apps tiers don't inject the
# full claims envelope, only these two scalar headers). Synthesize
# a minimal principal; roles default to 'reader'. An admin must
# then grant elevated roles via dbo.AppRoleAssignment.
return _resolve_principal(
oid=oid_only or f"local:{upn_only}",
upn=upn_only,
display_name=None,
tenant_id=None,
entra_roles=frozenset({"reader"}),
is_local=oid_only is None,
)
reviewer = h("x-reviewer")
if reviewer:
# Local-only path. Trust the header. Grant 'admin' on first sight
# so the developer can exercise approve/reject without ceremony.
# display_name=None so the seeded value in dbo.AppUser ('Dev Local'
# for the docker-compose default user) sticks across requests rather
# than being overwritten by the UPN every call.
return _resolve_principal(
oid=f"local:{reviewer}",
upn=reviewer,
display_name=None,
tenant_id=None,
entra_roles=frozenset({"admin"}),
is_local=True,
)
raise AuthError(
401, "missing_principal",
"no x-ms-client-principal (Easy Auth), x-ms-client-principal-name, "
"or X-Reviewer header on the request",
)
def require_role(principal: Principal, role: str) -> None:
"""Raise AuthError(403) unless `principal` holds at least `role`."""
if not principal.has_role(role):
raise AuthError(
403, "forbidden",
f"principal {principal.upn} (roles={sorted(principal.roles)}) "
f"lacks required role {role!r}",
)
# ---------- Internals ----------
def _principal_from_easy_auth(payload: dict[str, Any]) -> Principal:
"""Parse the Easy Auth JSON envelope into a Principal."""
claims = payload.get("claims") or []
by_type: dict[str, list[str]] = {}
for c in claims:
t = c.get("typ")
v = c.get("val")
if not t or v is None:
continue
by_type.setdefault(t, []).append(str(v))
def first(types: tuple[str, ...]) -> str | None:
for t in types:
if by_type.get(t):
return by_type[t][0]
return None
oid = first(_OID_CLAIM_TYPES)
upn = first(_UPN_CLAIM_TYPES) or payload.get("user_id") or ""
if not oid or not upn:
raise AuthError(
401, "invalid_principal",
"Easy Auth payload missing oid or upn claim",
)
display = first(_NAME_CLAIM_TYPES)
tenant_id = first(_TID_CLAIM_TYPES)
raw_roles: set[str] = set()
for t in _ROLE_CLAIM_TYPES:
for v in by_type.get(t, ()): # may appear multiple times
mapped = _ENTRA_ROLE_MAP.get(v) or (v if v in _ROLE_RANK else None)
if mapped:
raw_roles.add(mapped)
if not raw_roles:
# No app roles assigned in Entra → fall through to dbo role
# assignments. _resolve_principal merges both.
raw_roles = {"reader"}
return _resolve_principal(
oid=oid,
upn=upn,
display_name=display,
tenant_id=tenant_id,
entra_roles=frozenset(raw_roles),
is_local=False,
)
def _resolve_principal(
*,
oid: str,
upn: str,
display_name: str | None,
tenant_id: str | None,
entra_roles: frozenset[str],
is_local: bool,
) -> Principal:
"""Lazy-upsert dbo.AppUser; merge Entra-claim roles with dbo role
assignments; return the consolidated Principal. The DisplayName on the
returned Principal is the persisted DB value (which may be more
descriptive than what the request claim carried — e.g., the seeded
'Dev Local' for the local fallback user)."""
user_id, business_unit, db_roles, persisted_display = _upsert_user(
oid=oid,
upn=upn,
display_name=display_name,
tenant_id=tenant_id,
is_local=is_local,
)
return Principal(
user_id=user_id,
oid=oid,
upn=upn,
display_name=persisted_display or display_name,
tenant_id=tenant_id,
business_unit=business_unit,
roles=entra_roles | db_roles,
is_local=is_local,
)
def _upsert_user(
*,
oid: str,
upn: str,
display_name: str | None,
tenant_id: str | None,
is_local: bool,
) -> tuple[str, str | None, frozenset[str], str | None]:
"""Insert dbo.AppUser if missing; otherwise update LastSeenAt + UPN
(Entra UPNs can change; OID is the stable key). Returns
(user_id, business_unit, dbo_roles, persisted_display_name)."""
with clients.sql_connect() as conn:
cur = conn.cursor()
cur.execute(
"SELECT UserId, BusinessUnit, DisplayName FROM dbo.AppUser WHERE AzureOid = ?",
oid,
)
row = cur.fetchone()
if row is None:
user_id = str(uuid.uuid4())
cur.execute(
"""
INSERT INTO dbo.AppUser
(UserId, AzureOid, Upn, DisplayName, TenantId)
VALUES (?, ?, ?, ?, ?)
""",
user_id, oid, upn, display_name, tenant_id,
)
# First-sight default role: 'admin' for local dev (so developers
# can hit approve/reject), 'reader' for Entra-authenticated.
default_role = "admin" if is_local else "reader"
source = "local-default" if is_local else "entra"
cur.execute(
"""
INSERT INTO dbo.AppRoleAssignment
(UserId, Role, Source) VALUES (?, ?, ?)
""",
user_id, default_role, source,
)
conn.commit()
LOG.info(
"auth.user_created user_id=%s upn=%s default_role=%s",
user_id, upn, default_role,
)
return user_id, None, frozenset({default_role}), display_name
user_id, business_unit, persisted_display = str(row[0]), row[1], row[2]
# Refresh LastSeenAt + UPN drift in one statement. DisplayName is
# only updated if the request brought a non-null value, so the seeded
# 'Dev Local' for the local fallback user survives across requests
# where the X-Reviewer flow can only supply a UPN.
cur.execute(
"""
UPDATE dbo.AppUser
SET LastSeenAt = SYSUTCDATETIME(),
Upn = ?,
DisplayName = COALESCE(?, DisplayName),
TenantId = COALESCE(?, TenantId)
WHERE UserId = ?
""",
upn, display_name, tenant_id, user_id,
)
cur.execute(
"SELECT Role FROM dbo.AppRoleAssignment WHERE UserId = ?",
user_id,
)
roles = frozenset(r[0] for r in cur.fetchall())
conn.commit()
return user_id, business_unit, roles, (display_name or persisted_display)