|
| 1 | +# Copyright © 2024 Province of British Columbia |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | +"""Keycloak admin functions – same pattern as submit-api KeycloakService.""" |
| 15 | +import requests |
| 16 | +from flask import current_app |
| 17 | + |
| 18 | + |
| 19 | +# Same group path as submit-api (SUBMIT / EAO_MANAGER) |
| 20 | +EAO_MANAGER_GROUP_PATH = "SUBMIT/EAO_MANAGER" |
| 21 | + |
| 22 | + |
| 23 | +class KeycloakService: |
| 24 | + """Keycloak admin API – same token and request pattern as submit-api.""" |
| 25 | + |
| 26 | + @staticmethod |
| 27 | + def _get_admin_token(): |
| 28 | + """Create an admin token (same as submit-api KeycloakService._get_admin_token).""" |
| 29 | + config = current_app.config |
| 30 | + base_url = config.get("KEYCLOAK_BASE_URL") |
| 31 | + realm = config.get("KEYCLOAK_REALM_NAME") |
| 32 | + admin_client_id = config.get("KEYCLOAK_EMAILER_CLIENT") |
| 33 | + admin_secret = config.get("KEYCLOAK_EMAILER_SECRET") |
| 34 | + timeout = int(config.get("CONNECT_TIMEOUT", 60)) |
| 35 | + token_url = f"{base_url}/auth/realms/{realm}/protocol/openid-connect/token" |
| 36 | + |
| 37 | + if not admin_client_id or not admin_secret: |
| 38 | + raise ValueError( |
| 39 | + "KEYCLOAK_EMAILER_CLIENT and KEYCLOAK_EMAILER_SECRET must be set in .env" |
| 40 | + ) |
| 41 | + |
| 42 | + headers = {"Content-Type": "application/x-www-form-urlencoded"} |
| 43 | + # Use dict so requests form-encodes correctly (handles special chars in secret) |
| 44 | + data = { |
| 45 | + "client_id": admin_client_id, |
| 46 | + "grant_type": "client_credentials", |
| 47 | + "client_secret": admin_secret, |
| 48 | + } |
| 49 | + response = requests.post( |
| 50 | + token_url, |
| 51 | + data=data, |
| 52 | + headers=headers, |
| 53 | + timeout=timeout, |
| 54 | + ) |
| 55 | + response.raise_for_status() |
| 56 | + return response.json().get("access_token") |
| 57 | + |
| 58 | + @staticmethod |
| 59 | + def _request_keycloak(relative_url: str): |
| 60 | + """GET request to Keycloak admin API (same URL pattern as submit-api).""" |
| 61 | + base_url = current_app.config.get("KEYCLOAK_BASE_URL") |
| 62 | + realm = current_app.config.get("KEYCLOAK_REALM_NAME") |
| 63 | + timeout = int(current_app.config.get("CONNECT_TIMEOUT", 60)) |
| 64 | + admin_token = KeycloakService._get_admin_token() |
| 65 | + headers = { |
| 66 | + "Content-Type": "application/json", |
| 67 | + "Authorization": f"Bearer {admin_token}", |
| 68 | + } |
| 69 | + url = f"{base_url}/auth/admin/realms/{realm}/{relative_url}" |
| 70 | + response = requests.get(url, headers=headers, timeout=timeout) |
| 71 | + response.raise_for_status() |
| 72 | + return response |
| 73 | + |
| 74 | + @staticmethod |
| 75 | + def get_groups(brief_representation: bool = False): |
| 76 | + """Get all top-level groups.""" |
| 77 | + response = KeycloakService._request_keycloak( |
| 78 | + f"groups?briefRepresentation={brief_representation}" |
| 79 | + ) |
| 80 | + return response.json() |
| 81 | + |
| 82 | + @staticmethod |
| 83 | + def get_sub_groups(group_id: str): |
| 84 | + """Return the subgroups of given group.""" |
| 85 | + response = KeycloakService._request_keycloak(f"groups/{group_id}/children") |
| 86 | + return response.json() |
| 87 | + |
| 88 | + @staticmethod |
| 89 | + def get_group_id_by_path(group_path: str) -> str: |
| 90 | + """Find a Keycloak group by full path (e.g. 'SUBMIT/EAO_MANAGER') and return its ID.""" |
| 91 | + segments = group_path.strip("/").split("/") |
| 92 | + current_groups = KeycloakService.get_groups(brief_representation=True) |
| 93 | + current_group = None |
| 94 | + |
| 95 | + for segment in segments: |
| 96 | + matched = next((g for g in current_groups if g["name"] == segment), None) |
| 97 | + if not matched: |
| 98 | + raise ValueError(f"Group segment '{segment}' not found.") |
| 99 | + current_group = matched |
| 100 | + current_groups = KeycloakService.get_sub_groups(current_group["id"]) |
| 101 | + |
| 102 | + return current_group["id"] |
| 103 | + |
| 104 | + @staticmethod |
| 105 | + def get_members_for_group(group_id: str): |
| 106 | + """Get the members of a group (Keycloak user objects with email, etc.).""" |
| 107 | + response = KeycloakService._request_keycloak(f"groups/{group_id}/members") |
| 108 | + return response.json() |
| 109 | + |
| 110 | + @classmethod |
| 111 | + def get_eao_manager_emails(cls) -> list: |
| 112 | + """Return email addresses of SUBMIT/EAO_MANAGER group members (same as submit-api flow).""" |
| 113 | + try: |
| 114 | + group_id = cls.get_group_id_by_path(EAO_MANAGER_GROUP_PATH) |
| 115 | + members = cls.get_members_for_group(group_id) |
| 116 | + return [m.get("email") for m in members if m.get("email")] |
| 117 | + except (ValueError, requests.RequestException): |
| 118 | + return [] |
0 commit comments