From 034223b89a710a122d0140b475c1ca6347826b2a Mon Sep 17 00:00:00 2001 From: Dan Dammann Date: Fri, 15 May 2026 09:58:29 +0100 Subject: [PATCH 1/5] CDD-3173: add the base code for the DB permission filtering of private APIs (using /api/charts/v3 as the first example) --- common/auth/cognito_jwt/backend.py | 3 +- common/auth/cognito_jwt/user_manager.py | 33 +++++++- .../views/charts/single_category_charts.py | 9 ++- metrics/data/managers/core_models/headline.py | 58 +++++++++++--- .../data/managers/core_models/time_series.py | 78 +++++++++++++++---- metrics/data/managers/permissions.py | 70 +++++++++++++++++ metrics/domain/models/charts/common.py | 26 +++++++ metrics/interfaces/plots/access.py | 3 +- metrics/utils/permission_hierarchy.py | 48 ++++++++++++ 9 files changed, 296 insertions(+), 32 deletions(-) create mode 100644 metrics/data/managers/permissions.py diff --git a/common/auth/cognito_jwt/backend.py b/common/auth/cognito_jwt/backend.py index 1fed42fec..c047cc9d9 100644 --- a/common/auth/cognito_jwt/backend.py +++ b/common/auth/cognito_jwt/backend.py @@ -46,7 +46,8 @@ def authenticate(self, request): try: token_validator = self.get_token_validator(request) jwt_payload = token_validator.validate(jwt_token) - except TokenError: + except TokenError as error: + logger.warning("JWT validation failed: %s", error) raise exceptions.AuthenticationFailed from None custom_user_manager = self.get_custom_user_manager() diff --git a/common/auth/cognito_jwt/user_manager.py b/common/auth/cognito_jwt/user_manager.py index 9d8c522c8..207ee989b 100644 --- a/common/auth/cognito_jwt/user_manager.py +++ b/common/auth/cognito_jwt/user_manager.py @@ -3,6 +3,8 @@ from django.contrib.auth import get_user_model from django.contrib.auth.models import BaseUserManager +from metrics.utils.permission_hierarchy import convert_permission_set_into_hierarchy + logger = logging.getLogger(__name__) @@ -15,8 +17,23 @@ def get_or_create_for_cognito(jwt_payload): so this speeds up the request by removing the need for any DB access """ try: - username = jwt_payload["entraObjectId"] - permission_sets = jwt_payload["permissionSets"] + # username = jwt_payload["entraObjectId"] + # raw_permission_sets = jwt_payload["permissionSets"] + + # TODO: Testing + username = '678a605b-16f3-4342-9f02-db74613701ac' + raw_permission_sets = { + "permission_sets": [ + { + "theme": {"id": "100", "name": "immunisation"}, + "sub_theme": {"id": "133", "name": "childhood-vaccines"}, + "topic": {"id": "-1", "name": None}, + } + ], + "summary": { + "has_global_access": False + }, + } except KeyError: logger.debug( "Error getting entraObjectId and/or permissionSets field(s)" @@ -25,7 +42,19 @@ def get_or_create_for_cognito(jwt_payload): ) return None + permission_sets = convert_permission_set_into_hierarchy(raw_permission_sets) + permission_count = len(permission_sets.get("permission_set_hierarchy", [])) + has_global_access = bool(permission_sets.get("has_global_access", False)) + + logger.info( + "JWT token for user '%s' with permissions: permission_count=%d, has_global_access=%s", + username, + permission_count, + has_global_access, + ) + user_class = get_user_model() user = user_class(username=username) user.permission_sets = permission_sets + return user diff --git a/metrics/api/views/charts/single_category_charts.py b/metrics/api/views/charts/single_category_charts.py index 2767c3982..29453f3e7 100644 --- a/metrics/api/views/charts/single_category_charts.py +++ b/metrics/api/views/charts/single_category_charts.py @@ -4,9 +4,12 @@ from django.http import FileResponse from drf_spectacular.utils import OpenApiExample, extend_schema from rest_framework import permissions +from rest_framework.authentication import SessionAuthentication from rest_framework.response import Response from rest_framework.views import APIView +from common.auth.cognito_jwt import JSONWebTokenAuthentication + import config from caching.private_api.decorators import cache_response from metrics.api.decorators.auth import require_authorisation @@ -26,7 +29,6 @@ CHARTS_API_TAG = "charts" - class ChartsView(APIView): permission_classes = [] @@ -218,6 +220,7 @@ def post(cls, request, *args, **kwargs): class EncodedChartsView(APIView): + authentication_classes = [SessionAuthentication, JSONWebTokenAuthentication] permission_classes = [] @classmethod @@ -340,8 +343,6 @@ def post(cls, request, *args, **kwargs): serializer.is_valid(raise_exception=True) except (InvalidPlotParametersError, DataNotFoundForAnyPlotError) as error: - return Response( - status=HTTPStatus.BAD_REQUEST, data={"error_message": str(error)} - ) + return Response(status=HTTPStatus.BAD_REQUEST, data={"error_message": str(error)}) return Response(data=serializer.data) diff --git a/metrics/data/managers/core_models/headline.py b/metrics/data/managers/core_models/headline.py index f33bf913c..8a2801f8e 100644 --- a/metrics/data/managers/core_models/headline.py +++ b/metrics/data/managers/core_models/headline.py @@ -15,6 +15,10 @@ from metrics.api.permissions.fluent_permissions import ( validate_permissions_for_non_public, ) +from metrics.data.managers.permissions import ( + check_permissions_hierarchy, +) +from metrics.data.models.core_models.supporting import Topic class CoreHeadlineQuerySet(models.QuerySet): @@ -334,6 +338,7 @@ def query_for_data( theme: str = "", sub_theme: str = "", rbac_permissions: Iterable["RBACPermission"] | None = None, + jwt_permissions: dict | None = None, **kwargs, ): """Filters for a N-item list of dicts by the given params if `fields_to_export` is used. @@ -364,7 +369,7 @@ def query_for_data( Note that options are `M`, `F`, or `ALL`. age: The age range to apply additional filtering to. E.g. `0_4` would be used to capture the age of 0-4 years old - theme: The name of the theme being queried. + theme: The name of the theme being queried. This is only used to determine permissions for the non-public portion of the requested dataset. sub_theme: The name of the sub theme being queried. @@ -373,6 +378,9 @@ def query_for_data( rbac_permissions: The RBAC permissions available to the given request. This dictates whether the given request is permitted access to non-public data or not. + jwt_permissions: JWT permissions dict extracted from Cognito token. + Contains 'has_global_access' (bool) and 'permission_set_hierarchy' (list). + Used for new JWT-based authorization (takes precedence over RBAC permissions). Returns: Queryset of (x_axis, y_axis) where x_axis represents the variable on the x_axis @@ -383,15 +391,45 @@ def query_for_data( """ rbac_permissions = rbac_permissions or [] - has_access_to_non_public_data: bool = validate_permissions_for_non_public( - theme=theme, - sub_theme=sub_theme, - topic=topic, - metric=metric, - geography=geography, - geography_type=geography_type, - rbac_permissions=rbac_permissions, - ) + jwt_permissions = jwt_permissions or {} + + # Check JWT permissions first (new authorization, takes precedence) + has_access_to_non_public_data: bool = False + + if jwt_permissions: + has_global_access = jwt_permissions.get("has_global_access", False) + if has_global_access: + has_access_to_non_public_data = True + else: + permission_set_hierarchy = jwt_permissions.get( + "permission_set_hierarchy", [] + ) + topic_record = Topic.objects.filter( + name=topic, + sub_theme__name=sub_theme, + sub_theme__theme__name=theme, + ).first() + + if topic_record: + has_access_to_non_public_data = check_permissions_hierarchy( + user_permission_hierarchy=permission_set_hierarchy, + theme_id=str(topic_record.sub_theme.theme_id), + sub_theme_id=str(topic_record.sub_theme_id), + topic_id=str(topic_record.id), + ) + else: + has_access_to_non_public_data = False + else: + # LEGACY PERMISSIONS (NOT IN USE) - Fallback to RBAC if no JWT permissions + has_access_to_non_public_data = validate_permissions_for_non_public( + theme=theme, + sub_theme=sub_theme, + topic=topic, + metric=metric, + geography=geography, + geography_type=geography_type, + rbac_permissions=rbac_permissions, + ) if has_access_to_non_public_data: queryset = self.get_queryset().get_all_headlines_released_from_embargo( diff --git a/metrics/data/managers/core_models/time_series.py b/metrics/data/managers/core_models/time_series.py index aed8df7f6..73c33833c 100644 --- a/metrics/data/managers/core_models/time_series.py +++ b/metrics/data/managers/core_models/time_series.py @@ -17,6 +17,10 @@ is_public_data_only_enforced, validate_permissions_for_non_public, ) +from metrics.data.managers.permissions import ( + check_permissions_hierarchy, +) +from metrics.data.models.core_models.supporting import Topic from metrics.data.models import RBACPermission ALLOWABLE_METRIC_VALUE_RANGE_TYPE = tuple[str | float | int, str | float | int] @@ -533,6 +537,7 @@ def query_for_data( sub_theme: str = "", metric_value_ranges: list[str | float | int] | None = None, rbac_permissions: Iterable[RBACPermission] | None = None, + jwt_permissions: dict | None = None, ) -> CoreTimeSeriesQuerySet: """Filters for a 2-item object by the given params. Slices all values older than the `date_from`. @@ -579,11 +584,14 @@ def query_for_data( i.e. to filter for all record with values between 0 -> 80 AND 90 -> 100, this can be provided as `[(0, 80), (90, 100)]`. - rbac_permissions: The RBAC permissions available - to the given request. This dictates whether the given - request is permitted access to non-public data or not. - - Notes: + rbac_permissions: The RBAC permissions available + to the given request. This dictates whether the given + request is permitted access to non-public data or not. + jwt_permissions: JWT permissions dict extracted from Cognito token. + Contains 'has_global_access' (bool) and 'permission_set_hierarchy' (list). + Used for new JWT-based authorization (takes precedence over RBAC permissions). + + Notes: If we have the following input `queryset`: ---------------------------------------- | 2023-01-01 | 2023-01-02 | 2023-01-03 | @@ -612,15 +620,57 @@ def query_for_data( """ rbac_permissions: Iterable[RBACPermission] = rbac_permissions or [] - has_access_to_non_public_data: bool = validate_permissions_for_non_public( - theme=theme, - sub_theme=sub_theme, - topic=topic, - metric=metric, - geography_type=geography_type, - geography=geography, - rbac_permissions=rbac_permissions, - ) + jwt_permissions = jwt_permissions or {} + + # Only allow access, if permissions checks below are passed + has_access_to_non_public_data: bool = False + + # Check JWT permissions first (new authorization, takes precedence) + if jwt_permissions: + has_global_access = jwt_permissions.get("has_global_access", False) + + if has_global_access: + has_access_to_non_public_data = True + else: + # TODO: All the below is to be replaces by a similar function as + # check_permissions() that returns all the filter columns + # but removes the "-1" filter columns + permission_set_hierarchy = jwt_permissions.get( + "permission_set_hierarchy", [] + ) + + topic_record = None + + if theme and sub_theme: + topic_record = Topic.objects.filter( + name=topic, + sub_theme__name=sub_theme, + sub_theme__theme__name=theme, + ).first() + + if topic_record is None: + topic_record = Topic.objects.filter(name=topic).first() + + if topic_record: + has_access_to_non_public_data = check_permissions_hierarchy( + user_permission_hierarchy=permission_set_hierarchy, + theme_id=str(topic_record.sub_theme.theme_id), + sub_theme_id=str(topic_record.sub_theme_id), + topic_id=str(topic_record.id), + ) + else: + has_access_to_non_public_data = False + else: + # LEGACY PERMISSIONS (NOT IN USE) - Fallback to RBAC if no JWT permissions + has_access_to_non_public_data = validate_permissions_for_non_public( + theme=theme, + sub_theme=sub_theme, + topic=topic, + metric=metric, + geography_type=geography_type, + geography=geography, + rbac_permissions=rbac_permissions, + ) return self.get_queryset().query_for_data( fields_to_export=fields_to_export, diff --git a/metrics/data/managers/permissions.py b/metrics/data/managers/permissions.py new file mode 100644 index 000000000..ca65631c9 --- /dev/null +++ b/metrics/data/managers/permissions.py @@ -0,0 +1,70 @@ +"""JWT-based permission validation for non-public data access.""" + +from typing import TypedDict + + +class PermissionSetLevel(TypedDict): + """Represents a level (theme, sub_theme, or topic) in the permission hierarchy.""" + id: str + name: str + + +class PermissionHierarchy(TypedDict): + """Represents a single permission entry in the user's permission set hierarchy.""" + theme: PermissionSetLevel + sub_theme: PermissionSetLevel + topic: PermissionSetLevel + + +def check_permissions_hierarchy( + *, + user_permission_hierarchy: list[PermissionHierarchy] | None, + theme_id: str, + sub_theme_id: str, + topic_id: str, +) -> bool: + """ + Checks if the user's JWT permissions allow access to the requested non-public data. + """ + + if not user_permission_hierarchy: + return False + + return check_permissions( + user_permission_hierarchy, + theme_id, + sub_theme_id, + topic_id, + ) + + +def check_permissions(user_permissions, theme_id, sub_theme_id, topic_id) -> bool: + """ + TODO: This is an exact copy of cms/dashboard/viewsets.py:check_permissions(), + which needs to be centralized! + + @return True/False whether access allowed or not + """ + + if not isinstance(user_permissions, list): + return False + + for permission in user_permissions: + permission_theme_id = permission.get("theme", {}).get("id") + permission_sub_theme_id = permission.get("sub_theme", {}).get("id") + permission_topic_id = permission.get("topic", {}).get("id") + + if permission_theme_id == "-1": + return True + + if permission_theme_id == theme_id and permission_sub_theme_id == "-1": + return True + + if ( + permission_theme_id == theme_id + and permission_sub_theme_id == sub_theme_id + and (permission_topic_id == "-1" or permission_topic_id == topic_id) + ): + return True + + return False diff --git a/metrics/domain/models/charts/common.py b/metrics/domain/models/charts/common.py index 317301450..ed4b28053 100644 --- a/metrics/domain/models/charts/common.py +++ b/metrics/domain/models/charts/common.py @@ -24,6 +24,32 @@ class BaseChartRequestParams(BaseModel): class Config: arbitrary_types_allowed = True + # LEGACY PERMISSIONS (NOT IN USE): RBAC-based permissions will be removed in a future release @property def rbac_permissions(self) -> Iterable["RBACPermission"]: return getattr(self.request, "rbac_permissions", []) + + @property + def jwt_permissions(self) -> dict: + """Extract JWT permissions from the authenticated request. + + Returns a dict with: + - 'has_global_access' (bool): Whether user has access to all data + - 'permission_set_hierarchy' (list): User's permission hierarchy from JWT + + Returns empty dict if not authenticated or no permissions available. + """ + if self.request is None: + return {} + + user = getattr(self.request, "user", None) + if user is None: + return {} + + permission_sets = getattr(user, "permission_sets", {}) + if not permission_sets: + return {} + + return permission_sets + + diff --git a/metrics/interfaces/plots/access.py b/metrics/interfaces/plots/access.py index 6e4eac34d..4e6815019 100644 --- a/metrics/interfaces/plots/access.py +++ b/metrics/interfaces/plots/access.py @@ -162,7 +162,8 @@ def get_queryset_from_core_model_manager( plot_params["fields_to_export"].append("lower_confidence") return self.core_model_manager.query_for_data( - **plot_params, rbac_permissions=self.chart_request_params.rbac_permissions + **plot_params, rbac_permissions=self.chart_request_params.rbac_permissions, + jwt_permissions=self.chart_request_params.jwt_permissions, ) def build_plot_data_from_parameters_with_complete_queryset( diff --git a/metrics/utils/permission_hierarchy.py b/metrics/utils/permission_hierarchy.py index c5a182204..41ad23dda 100644 --- a/metrics/utils/permission_hierarchy.py +++ b/metrics/utils/permission_hierarchy.py @@ -21,6 +21,54 @@ ) +def convert_permission_set_into_hierarchy(raw_permission_sets: dict) -> dict: + """ + Convert a "permission_set" back into a "permission_set_hierarchy" again + (the NormalizedPermission class does it the other way round) + + @param raw_permission_sets {dict} example: + { + "permission_sets": [ + { + "theme": {"id": "100", "name": "immunisation"}, + "sub_theme": {"id": "133", "name": "childhood-vaccines"}, + "topic": {"id": None, "name": None}, + } + ], + "summary": { + "has_global_access": False + }, + } + + @return {dict} example: + { + "permission_set_hierarchy": [ + { + "theme": {"id": "100", "name": "immunisation"}, + "sub_theme": {"id": "133", "name": "childhood-vaccines"}, + "topic": {"id": None, "name": None}, + } + ], + "has_global_access": False, + } + """ + + permission_set_hierarchy = raw_permission_sets.get("permission_set_hierarchy") + if permission_set_hierarchy is None: + permission_set_hierarchy = raw_permission_sets.get("permission_sets", []) + + has_global_access = raw_permission_sets.get("has_global_access") + if has_global_access is None: + has_global_access = raw_permission_sets.get("summary", {}).get( + "has_global_access", False + ) + + return { + "permission_set_hierarchy": permission_set_hierarchy, + "has_global_access": bool(has_global_access), + } + + @dataclass class NormalizedPermission: """ From 4d6a4acac3d3058948ce1ea8beb626fc9b7e58cb Mon Sep 17 00:00:00 2001 From: Dan Dammann Date: Tue, 19 May 2026 10:29:43 +0100 Subject: [PATCH 2/5] CDD-3173: move permissions.py to folder where the other permission functions/classes are --- metrics/data/managers/core_models/headline.py | 2 +- metrics/data/managers/core_models/time_series.py | 2 +- metrics/{data/managers => utils}/permissions.py | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename metrics/{data/managers => utils}/permissions.py (100%) diff --git a/metrics/data/managers/core_models/headline.py b/metrics/data/managers/core_models/headline.py index 8a2801f8e..3f6fb60a1 100644 --- a/metrics/data/managers/core_models/headline.py +++ b/metrics/data/managers/core_models/headline.py @@ -15,7 +15,7 @@ from metrics.api.permissions.fluent_permissions import ( validate_permissions_for_non_public, ) -from metrics.data.managers.permissions import ( +from metrics.utils.permissions import ( check_permissions_hierarchy, ) from metrics.data.models.core_models.supporting import Topic diff --git a/metrics/data/managers/core_models/time_series.py b/metrics/data/managers/core_models/time_series.py index 73c33833c..5bda7e79f 100644 --- a/metrics/data/managers/core_models/time_series.py +++ b/metrics/data/managers/core_models/time_series.py @@ -17,7 +17,7 @@ is_public_data_only_enforced, validate_permissions_for_non_public, ) -from metrics.data.managers.permissions import ( +from metrics.utils.permissions import ( check_permissions_hierarchy, ) from metrics.data.models.core_models.supporting import Topic diff --git a/metrics/data/managers/permissions.py b/metrics/utils/permissions.py similarity index 100% rename from metrics/data/managers/permissions.py rename to metrics/utils/permissions.py From b927176c0ae9c7b34261f2b4b8d0127cc43404b5 Mon Sep 17 00:00:00 2001 From: Dan Dammann Date: Tue, 19 May 2026 22:37:19 +0100 Subject: [PATCH 3/5] CDD-3173: make DB permission filtering generic for all 6 fields --- common/auth/cognito_jwt/backend.py | 11 +- common/auth/cognito_jwt/user_manager.py | 69 +++- .../views/charts/single_category_charts.py | 8 +- metrics/data/managers/core_models/headline.py | 35 +- .../data/managers/core_models/time_series.py | 45 +-- metrics/domain/models/charts/common.py | 26 -- metrics/interfaces/plots/access.py | 6 +- metrics/utils/permission_hierarchy.py | 6 +- metrics/utils/permissions.py | 301 +++++++++++++++--- 9 files changed, 360 insertions(+), 147 deletions(-) diff --git a/common/auth/cognito_jwt/backend.py b/common/auth/cognito_jwt/backend.py index c047cc9d9..6ac2b5415 100644 --- a/common/auth/cognito_jwt/backend.py +++ b/common/auth/cognito_jwt/backend.py @@ -19,9 +19,9 @@ def get_authorization_header(request): """ Return request's authentication header, as a bytestring. - Hide some test client ickyness where the header can be unicode. """ + auth_header = getattr(settings, "COGNITO_JWT_AUTH_HEADER", "Authorization") auth = request.META.get(auth_header, b"") if isinstance(auth, str): @@ -31,13 +31,18 @@ def get_authorization_header(request): class JSONWebTokenAuthentication(BaseAuthentication): - """Token based authentication using the JSON Web Token standard. + """ + Token based authentication using the JSON Web Token standard. Based on https://github.com/labd/django-cognito-jwt and modified to suit our use case """ def authenticate(self, request): - """Entrypoint for Django Rest Framework""" + """ + The JWT token has arrived at an API endpoint and the journey starts here. + Entrypoint for the Django Rest Framework. + """ + jwt_token = self.get_jwt_token(request) if jwt_token is None: return None diff --git a/common/auth/cognito_jwt/user_manager.py b/common/auth/cognito_jwt/user_manager.py index 207ee989b..8cc5777b0 100644 --- a/common/auth/cognito_jwt/user_manager.py +++ b/common/auth/cognito_jwt/user_manager.py @@ -1,10 +1,14 @@ import logging +from typing import TYPE_CHECKING from django.contrib.auth import get_user_model from django.contrib.auth.models import BaseUserManager from metrics.utils.permission_hierarchy import convert_permission_set_into_hierarchy +if TYPE_CHECKING: + from rest_framework.request import Request + logger = logging.getLogger(__name__) @@ -12,28 +16,28 @@ class CognitoManager(BaseUserManager): @staticmethod def get_or_create_for_cognito(jwt_payload): - """Create an ephemeral user instance for this request. + """ + Create an ephemeral user instance for this request. We don't need to store or retrieve any info, we use what's in the JWT, so this speeds up the request by removing the need for any DB access """ + try: - # username = jwt_payload["entraObjectId"] - # raw_permission_sets = jwt_payload["permissionSets"] + username = jwt_payload["entraObjectId"] + raw_permission_sets = jwt_payload["permissionSets"] # TODO: Testing - username = '678a605b-16f3-4342-9f02-db74613701ac' - raw_permission_sets = { - "permission_sets": [ - { - "theme": {"id": "100", "name": "immunisation"}, - "sub_theme": {"id": "133", "name": "childhood-vaccines"}, - "topic": {"id": "-1", "name": None}, - } - ], - "summary": { - "has_global_access": False - }, - } + # username = "678a605b-16f3-4342-9f02-db74613701ac" + # raw_permission_sets = { + # "permission_sets": [ + # { + # "theme": {"id": "100", "name": "immunisation"}, + # "sub_theme": {"id": "133", "name": "childhood-vaccines"}, + # "topic": {"id": "-1", "name": None}, + # } + # ], + # "summary": {"has_global_access": False}, + # } except KeyError: logger.debug( "Error getting entraObjectId and/or permissionSets field(s)" @@ -58,3 +62,36 @@ def get_or_create_for_cognito(jwt_payload): user.permission_sets = permission_sets return user + + +def extract_jwt_permissions(*, request: "Request | None") -> dict: + """ + Extract the normalized JWT permissions dict from an authenticated request. + + Reads `request.user.permission_sets`, which is set by CognitoManager + during JWT authentication. Lives here because it is the counterpart to + the code above that "writes" user.permission_sets. + + @param {Request | None} request example: + + + @return {dict} example: + { + "permission_set_hierarchy": [ + {"theme": {"id": "100", "name": "immunisation"}, ...} + ], + "has_global_access": False + } + """ + if request is None: + return {} + + user = getattr(request, "user", None) + if user is None: + return {} + + permission_sets = getattr(user, "permission_sets", {}) + if not permission_sets: + return {} + + return permission_sets diff --git a/metrics/api/views/charts/single_category_charts.py b/metrics/api/views/charts/single_category_charts.py index 29453f3e7..43fbe8623 100644 --- a/metrics/api/views/charts/single_category_charts.py +++ b/metrics/api/views/charts/single_category_charts.py @@ -8,10 +8,9 @@ from rest_framework.response import Response from rest_framework.views import APIView -from common.auth.cognito_jwt import JSONWebTokenAuthentication - import config from caching.private_api.decorators import cache_response +from common.auth.cognito_jwt import JSONWebTokenAuthentication from metrics.api.decorators.auth import require_authorisation from metrics.api.enums import AppMode from metrics.api.serializers import ChartsSerializer @@ -29,6 +28,7 @@ CHARTS_API_TAG = "charts" + class ChartsView(APIView): permission_classes = [] @@ -343,6 +343,8 @@ def post(cls, request, *args, **kwargs): serializer.is_valid(raise_exception=True) except (InvalidPlotParametersError, DataNotFoundForAnyPlotError) as error: - return Response(status=HTTPStatus.BAD_REQUEST, data={"error_message": str(error)}) + return Response( + status=HTTPStatus.BAD_REQUEST, data={"error_message": str(error)} + ) return Response(data=serializer.data) diff --git a/metrics/data/managers/core_models/headline.py b/metrics/data/managers/core_models/headline.py index 3f6fb60a1..6e880172d 100644 --- a/metrics/data/managers/core_models/headline.py +++ b/metrics/data/managers/core_models/headline.py @@ -16,9 +16,8 @@ validate_permissions_for_non_public, ) from metrics.utils.permissions import ( - check_permissions_hierarchy, + check_if_any_permissions_allow_access, ) -from metrics.data.models.core_models.supporting import Topic class CoreHeadlineQuerySet(models.QuerySet): @@ -390,37 +389,31 @@ def query_for_data( Examples: """ + rbac_permissions = rbac_permissions or [] jwt_permissions = jwt_permissions or {} - # Check JWT permissions first (new authorization, takes precedence) + # Only allow access, if permissions checks below are passed has_access_to_non_public_data: bool = False + # Check JWT permissions first (new authorization takes precedence) if jwt_permissions: has_global_access = jwt_permissions.get("has_global_access", False) + if has_global_access: has_access_to_non_public_data = True else: - permission_set_hierarchy = jwt_permissions.get( - "permission_set_hierarchy", [] + has_access_to_non_public_data = check_if_any_permissions_allow_access( + jwt_permissions=jwt_permissions, + theme=theme, + sub_theme=sub_theme, + topic=topic, + metric=metric, + geography_type=geography_type, + geography=geography, ) - topic_record = Topic.objects.filter( - name=topic, - sub_theme__name=sub_theme, - sub_theme__theme__name=theme, - ).first() - - if topic_record: - has_access_to_non_public_data = check_permissions_hierarchy( - user_permission_hierarchy=permission_set_hierarchy, - theme_id=str(topic_record.sub_theme.theme_id), - sub_theme_id=str(topic_record.sub_theme_id), - topic_id=str(topic_record.id), - ) - else: - has_access_to_non_public_data = False else: - # LEGACY PERMISSIONS (NOT IN USE) - Fallback to RBAC if no JWT permissions + # TODO: THESE LEGACY RBAC PERMISSIONS ARE NOT IN USE AND TO BE REMOVED IN A FUTURE RELEASE has_access_to_non_public_data = validate_permissions_for_non_public( theme=theme, sub_theme=sub_theme, diff --git a/metrics/data/managers/core_models/time_series.py b/metrics/data/managers/core_models/time_series.py index 5bda7e79f..6027430a4 100644 --- a/metrics/data/managers/core_models/time_series.py +++ b/metrics/data/managers/core_models/time_series.py @@ -17,11 +17,10 @@ is_public_data_only_enforced, validate_permissions_for_non_public, ) +from metrics.data.models import RBACPermission from metrics.utils.permissions import ( - check_permissions_hierarchy, + check_if_any_permissions_allow_access, ) -from metrics.data.models.core_models.supporting import Topic -from metrics.data.models import RBACPermission ALLOWABLE_METRIC_VALUE_RANGE_TYPE = tuple[str | float | int, str | float | int] @@ -619,49 +618,31 @@ def query_for_data( ]>` """ + rbac_permissions: Iterable[RBACPermission] = rbac_permissions or [] jwt_permissions = jwt_permissions or {} # Only allow access, if permissions checks below are passed has_access_to_non_public_data: bool = False - # Check JWT permissions first (new authorization, takes precedence) + # Check JWT permissions first (new authorization takes precedence) if jwt_permissions: has_global_access = jwt_permissions.get("has_global_access", False) if has_global_access: has_access_to_non_public_data = True else: - # TODO: All the below is to be replaces by a similar function as - # check_permissions() that returns all the filter columns - # but removes the "-1" filter columns - permission_set_hierarchy = jwt_permissions.get( - "permission_set_hierarchy", [] + has_access_to_non_public_data = check_if_any_permissions_allow_access( + jwt_permissions=jwt_permissions, + theme=theme, + sub_theme=sub_theme, + topic=topic, + metric=metric, + geography_type=geography_type, + geography=geography, ) - - topic_record = None - - if theme and sub_theme: - topic_record = Topic.objects.filter( - name=topic, - sub_theme__name=sub_theme, - sub_theme__theme__name=theme, - ).first() - - if topic_record is None: - topic_record = Topic.objects.filter(name=topic).first() - - if topic_record: - has_access_to_non_public_data = check_permissions_hierarchy( - user_permission_hierarchy=permission_set_hierarchy, - theme_id=str(topic_record.sub_theme.theme_id), - sub_theme_id=str(topic_record.sub_theme_id), - topic_id=str(topic_record.id), - ) - else: - has_access_to_non_public_data = False else: - # LEGACY PERMISSIONS (NOT IN USE) - Fallback to RBAC if no JWT permissions + # TODO: THESE LEGACY RBAC PERMISSIONS ARE NOT IN USE AND TO BE REMOVED IN A FUTURE RELEASE has_access_to_non_public_data = validate_permissions_for_non_public( theme=theme, sub_theme=sub_theme, diff --git a/metrics/domain/models/charts/common.py b/metrics/domain/models/charts/common.py index ed4b28053..317301450 100644 --- a/metrics/domain/models/charts/common.py +++ b/metrics/domain/models/charts/common.py @@ -24,32 +24,6 @@ class BaseChartRequestParams(BaseModel): class Config: arbitrary_types_allowed = True - # LEGACY PERMISSIONS (NOT IN USE): RBAC-based permissions will be removed in a future release @property def rbac_permissions(self) -> Iterable["RBACPermission"]: return getattr(self.request, "rbac_permissions", []) - - @property - def jwt_permissions(self) -> dict: - """Extract JWT permissions from the authenticated request. - - Returns a dict with: - - 'has_global_access' (bool): Whether user has access to all data - - 'permission_set_hierarchy' (list): User's permission hierarchy from JWT - - Returns empty dict if not authenticated or no permissions available. - """ - if self.request is None: - return {} - - user = getattr(self.request, "user", None) - if user is None: - return {} - - permission_sets = getattr(user, "permission_sets", {}) - if not permission_sets: - return {} - - return permission_sets - - diff --git a/metrics/interfaces/plots/access.py b/metrics/interfaces/plots/access.py index 4e6815019..d0ca77d88 100644 --- a/metrics/interfaces/plots/access.py +++ b/metrics/interfaces/plots/access.py @@ -7,6 +7,7 @@ from django.db.models import Manager, QuerySet from pydantic import BaseModel +from common.auth.cognito_jwt.user_manager import extract_jwt_permissions from metrics.api.settings import auth from metrics.data.models.core_models import CoreTimeSeries, Topic from metrics.domain.common.utils import ChartAxisFields @@ -162,8 +163,9 @@ def get_queryset_from_core_model_manager( plot_params["fields_to_export"].append("lower_confidence") return self.core_model_manager.query_for_data( - **plot_params, rbac_permissions=self.chart_request_params.rbac_permissions, - jwt_permissions=self.chart_request_params.jwt_permissions, + **plot_params, + rbac_permissions=self.chart_request_params.rbac_permissions, # TODO: legacy permissions to be removed + jwt_permissions=extract_jwt_permissions(request=self.chart_request_params.request), # new permissions ) def build_plot_data_from_parameters_with_complete_queryset( diff --git a/metrics/utils/permission_hierarchy.py b/metrics/utils/permission_hierarchy.py index 41ad23dda..895798ff5 100644 --- a/metrics/utils/permission_hierarchy.py +++ b/metrics/utils/permission_hierarchy.py @@ -26,13 +26,13 @@ def convert_permission_set_into_hierarchy(raw_permission_sets: dict) -> dict: Convert a "permission_set" back into a "permission_set_hierarchy" again (the NormalizedPermission class does it the other way round) - @param raw_permission_sets {dict} example: + @param {dict} raw_permission_sets example: { "permission_sets": [ { "theme": {"id": "100", "name": "immunisation"}, "sub_theme": {"id": "133", "name": "childhood-vaccines"}, - "topic": {"id": None, "name": None}, + "topic": {"id": "-1", "name": "* (All)"}, } ], "summary": { @@ -46,7 +46,7 @@ def convert_permission_set_into_hierarchy(raw_permission_sets: dict) -> dict: { "theme": {"id": "100", "name": "immunisation"}, "sub_theme": {"id": "133", "name": "childhood-vaccines"}, - "topic": {"id": None, "name": None}, + "topic": {"id": "-1", "name": "* (All)"}, } ], "has_global_access": False, diff --git a/metrics/utils/permissions.py b/metrics/utils/permissions.py index ca65631c9..34874a8e8 100644 --- a/metrics/utils/permissions.py +++ b/metrics/utils/permissions.py @@ -1,70 +1,289 @@ -"""JWT-based permission validation for non-public data access.""" +""" +Non-public permission validation, filtering and matching functions +""" -from typing import TypedDict +import logging +from typing import Literal, NotRequired, TypedDict + +from metrics.data.models.core_models.supporting import ( + Geography, + GeographyType, + Metric, + Topic, +) + +logger = logging.getLogger(__name__) + +# Our permission resources, e.g. anything that you can filter by permission +PermissionFilterResource = Literal[ + "theme", + "sub_theme", + "topic", + "metric", + "geography_type", + "geography", +] class PermissionSetLevel(TypedDict): - """Represents a level (theme, sub_theme, or topic) in the permission hierarchy.""" - id: str + """ + The "id" is the actual permission. + The "name" is just some blurb describing it. + """ + + id: str # can be "-1", therefore a string name: str class PermissionHierarchy(TypedDict): - """Represents a single permission entry in the user's permission set hierarchy.""" - theme: PermissionSetLevel - sub_theme: PermissionSetLevel - topic: PermissionSetLevel + """ + Our permission resources, e.g. anything that you can filter by permission. + Each one of them is optional, but at least one of them has to be provided. + """ + + theme: NotRequired[PermissionSetLevel] + sub_theme: NotRequired[PermissionSetLevel] + topic: NotRequired[PermissionSetLevel] + metric: NotRequired[PermissionSetLevel] + geography_type: NotRequired[PermissionSetLevel] + geography: NotRequired[PermissionSetLevel] -def check_permissions_hierarchy( +def check_if_any_permissions_allow_access( *, - user_permission_hierarchy: list[PermissionHierarchy] | None, - theme_id: str, - sub_theme_id: str, - topic_id: str, + jwt_permissions: dict | None, + theme: str = "", + sub_theme: str = "", + topic: str, + metric: str, + geography_type: str, + geography: str, ) -> bool: """ - Checks if the user's JWT permissions allow access to the requested non-public data. + This is our CORE PERMISSION-CHECKING function. + + Resolve request names to IDs and return whether any + of the passed API filters are being satisfied through + the user's permissions defined in the jwt_permissions. + + Any of our 6 passed permission fields is optional. If + none of them is set, no permission is given. + + @param {dict | None} jwt_permissions, eg: + { + "permission_set_hierarchy": [ + { + "theme": {"id": "100", "name": "immunisation"}, + "sub_theme": {"id": "133", "name": "childhood-vaccines"}, + "topic": {"id": "-1", "name": "* (All)"}, + } + ] + } + + @return {bool} """ - if not user_permission_hierarchy: - return False + topic_record = None + if topic: + if theme and sub_theme: + topic_record = Topic.objects.filter( + name=topic, + sub_theme__name=sub_theme, + sub_theme__theme__name=theme, + ).first() + if topic_record is None: + topic_record = Topic.objects.filter(name=topic).first() - return check_permissions( - user_permission_hierarchy, - theme_id, - sub_theme_id, - topic_id, + metric_record = None + if metric: + metric_record = Metric.objects.filter(name=metric).first() + + geography_type_record = None + if geography_type: + geography_type_record = GeographyType.objects.filter( + name=geography_type + ).first() + + geography_record = None + if geography: + geography_queryset = Geography.objects.filter(name=geography) + if geography_type_record: + geography_queryset = geography_queryset.filter( + geography_type_id=geography_type_record.id + ) + geography_record = geography_queryset.first() + + requested_filters: dict[PermissionFilterResource, str | None] = { + "theme": str(topic_record.sub_theme.theme_id) if topic_record else None, + "sub_theme": str(topic_record.sub_theme_id) if topic_record else None, + "topic": str(topic_record.id) if topic_record else None, + "metric": str(metric_record.id) if metric_record else None, + "geography_type": ( + str(geography_type_record.id) if geography_type_record else None + ), + "geography": ( + str(geography_record.geography_code) + if geography_record and geography_record.geography_code + else None + ), + } + + # Don't pass on any rubbish + normalized_requested_filters: dict[PermissionFilterResource, str] = { + key: str(value) + for key, value in requested_filters.items() + if _permission_value_is_valid(value) + } + + matching_permissions = filter_permissions( + jwt_permissions=jwt_permissions, + requested_filters=normalized_requested_filters, ) + return bool(matching_permissions) + -def check_permissions(user_permissions, theme_id, sub_theme_id, topic_id) -> bool: +def filter_permissions( + *, + jwt_permissions: dict | None, + requested_filters: dict[PermissionFilterResource, str], +) -> list[dict[str, str]]: """ - TODO: This is an exact copy of cms/dashboard/viewsets.py:check_permissions(), - which needs to be centralized! + Filter permissions that match the requested IDs. + Returns a list of matching permissions containing IDs (without wildcards "-1"). - @return True/False whether access allowed or not + @param {dict | None} jwt_permissions, eg: + { + "permission_set_hierarchy": [ + { + "theme": {"id": "100", "name": "immunisation"}, + "sub_theme": {"id": "133", "name": "childhood-vaccines"}, + "topic": {"id": "215", "name": "MMR1"}, + } + ] + } + + @param {dict} requested_filters, eg: + {"theme": "100", "sub_theme": "133", "topic": "114"} + + @return {list}, eg: + [{"theme": "100", "sub_theme": "133"}] """ - if not isinstance(user_permissions, list): + if not isinstance( + permission_set_hierarchy := (jwt_permissions or {}).get( + "permission_set_hierarchy" + ), + list, + ): + return [] + + if not requested_filters: + return [] + + matching_permissions: list[dict[str, str]] = [] + + for permission in permission_set_hierarchy: + if not isinstance(permission, dict) or not permission: + continue + + concrete_filters: dict[str, str] = {} + + # Permission row must satisfy all requested dimensions, not just some + for filter_key, requested_value in requested_filters.items(): + permission_value = _extract_permission_id( + permission=permission, + filter_key=filter_key, + ) + + if permission_value == requested_value: + # Exact permission match also grants access on this dimension. + concrete_filters[filter_key] = requested_value + continue + if permission_value == "-1": + # Wildcard grants access on this dimension. + concrete_filters[filter_key] = requested_value + continue + + # This permission row did not satisfy all requested filters + break + else: + # Only runs if loop didn't break -> full match. + matching_permissions.append(concrete_filters) + + return matching_permissions + + +def _extract_permission_id( + *, permission: dict[str, object], filter_key: PermissionFilterResource +) -> str | None: + """ + Extract a permission id for one filter key from a permission entry. + + @param {dict} permission, eg: + { + "theme": {"id": "100", "name": "immunisation"}, + "sub_theme": {"id": "133", "name": "childhood-vaccines"}, + "topic": {"id": "-1", "name": "* (All)"}, + } + + @param {str} filter_key, eg: + "sub_theme" + + @return {str | None}, eg: + "133" + """ + + # Do we offer permissions on this resource? + if filter_key not in permission: + return None + + # Is permission resource present? + permission_resource = permission.get(filter_key) + if not permission_resource: + return None + + # Permission resource malformed? + if not isinstance(permission_resource, dict): + return None + + # Does it have a meaningful permission value? + permission_id = permission_resource.get("id") + if not _permission_value_is_valid(permission_id): + return None + + # Return ID, if all is fine until here + return str(permission_id) + + +def _permission_value_is_valid(value: object) -> bool: + """ + Is value a valid permission ID? + + A valid permission value is an integer-like value except 0. + Integers that exist as string types are fine, eg "5" instead of 5. + The wildcard "-1" remains also valid. + """ + if value is None: return False - for permission in user_permissions: - permission_theme_id = permission.get("theme", {}).get("id") - permission_sub_theme_id = permission.get("sub_theme", {}).get("id") - permission_topic_id = permission.get("topic", {}).get("id") + if isinstance(value, bool): + return False - if permission_theme_id == "-1": - return True + value_as_text = str(value).strip() - if permission_theme_id == theme_id and permission_sub_theme_id == "-1": - return True + if not value_as_text: + return False + + try: + numeric_value = int(value_as_text) + except (TypeError, ValueError): + return False - if ( - permission_theme_id == theme_id - and permission_sub_theme_id == sub_theme_id - and (permission_topic_id == "-1" or permission_topic_id == topic_id) - ): - return True + return numeric_value != 0 - return False + +class InvalidPermissionHierarchyError(ValueError): + """ + Raised when a JWT permission hierarchy contains invalid values + """ From 28ec2ac7181cf06cfee90aff4a9789996ebc1c21 Mon Sep 17 00:00:00 2001 From: Dan Dammann Date: Wed, 20 May 2026 10:18:56 +0100 Subject: [PATCH 4/5] CDD-3173: remove TODO comments --- common/auth/cognito_jwt/user_manager.py | 2 +- metrics/data/managers/core_models/headline.py | 2 +- metrics/data/managers/core_models/time_series.py | 2 +- metrics/interfaces/plots/access.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/common/auth/cognito_jwt/user_manager.py b/common/auth/cognito_jwt/user_manager.py index 8cc5777b0..b4d134164 100644 --- a/common/auth/cognito_jwt/user_manager.py +++ b/common/auth/cognito_jwt/user_manager.py @@ -26,7 +26,7 @@ def get_or_create_for_cognito(jwt_payload): username = jwt_payload["entraObjectId"] raw_permission_sets = jwt_payload["permissionSets"] - # TODO: Testing + # Manual testing # username = "678a605b-16f3-4342-9f02-db74613701ac" # raw_permission_sets = { # "permission_sets": [ diff --git a/metrics/data/managers/core_models/headline.py b/metrics/data/managers/core_models/headline.py index 6e880172d..ba02a212a 100644 --- a/metrics/data/managers/core_models/headline.py +++ b/metrics/data/managers/core_models/headline.py @@ -413,7 +413,7 @@ def query_for_data( geography=geography, ) else: - # TODO: THESE LEGACY RBAC PERMISSIONS ARE NOT IN USE AND TO BE REMOVED IN A FUTURE RELEASE + # THESE LEGACY RBAC PERMISSIONS ARE NOT IN USE AND TO BE REMOVED IN A FUTURE RELEASE has_access_to_non_public_data = validate_permissions_for_non_public( theme=theme, sub_theme=sub_theme, diff --git a/metrics/data/managers/core_models/time_series.py b/metrics/data/managers/core_models/time_series.py index 6027430a4..d9c089111 100644 --- a/metrics/data/managers/core_models/time_series.py +++ b/metrics/data/managers/core_models/time_series.py @@ -642,7 +642,7 @@ def query_for_data( geography=geography, ) else: - # TODO: THESE LEGACY RBAC PERMISSIONS ARE NOT IN USE AND TO BE REMOVED IN A FUTURE RELEASE + # THESE LEGACY RBAC PERMISSIONS ARE NOT IN USE AND TO BE REMOVED IN A FUTURE RELEASE has_access_to_non_public_data = validate_permissions_for_non_public( theme=theme, sub_theme=sub_theme, diff --git a/metrics/interfaces/plots/access.py b/metrics/interfaces/plots/access.py index d0ca77d88..35ddfaf93 100644 --- a/metrics/interfaces/plots/access.py +++ b/metrics/interfaces/plots/access.py @@ -164,7 +164,7 @@ def get_queryset_from_core_model_manager( return self.core_model_manager.query_for_data( **plot_params, - rbac_permissions=self.chart_request_params.rbac_permissions, # TODO: legacy permissions to be removed + rbac_permissions=self.chart_request_params.rbac_permissions, # legacy permissions to be removed jwt_permissions=extract_jwt_permissions(request=self.chart_request_params.request), # new permissions ) From 41844307416b9483d079ac61b185442f8ee7b50d Mon Sep 17 00:00:00 2001 From: Dan Dammann Date: Wed, 20 May 2026 11:23:16 +0100 Subject: [PATCH 5/5] CDD-3173: rename and remove redundant things --- common/auth/cognito_jwt/backend.py | 5 +- common/auth/cognito_jwt/user_manager.py | 19 ++-- metrics/data/managers/core_models/headline.py | 21 ++--- .../data/managers/core_models/time_series.py | 12 +-- metrics/utils/permission_hierarchy.py | 12 +-- metrics/utils/permissions.py | 91 ++++++++++--------- 6 files changed, 78 insertions(+), 82 deletions(-) diff --git a/common/auth/cognito_jwt/backend.py b/common/auth/cognito_jwt/backend.py index 6ac2b5415..b27462767 100644 --- a/common/auth/cognito_jwt/backend.py +++ b/common/auth/cognito_jwt/backend.py @@ -19,9 +19,9 @@ def get_authorization_header(request): """ Return request's authentication header, as a bytestring. + Hide some test client ickyness where the header can be unicode. """ - auth_header = getattr(settings, "COGNITO_JWT_AUTH_HEADER", "Authorization") auth = request.META.get(auth_header, b"") if isinstance(auth, str): @@ -31,8 +31,7 @@ def get_authorization_header(request): class JSONWebTokenAuthentication(BaseAuthentication): - """ - Token based authentication using the JSON Web Token standard. + """Token based authentication using the JSON Web Token standard. Based on https://github.com/labd/django-cognito-jwt and modified to suit our use case """ diff --git a/common/auth/cognito_jwt/user_manager.py b/common/auth/cognito_jwt/user_manager.py index b4d134164..0f2c0bb43 100644 --- a/common/auth/cognito_jwt/user_manager.py +++ b/common/auth/cognito_jwt/user_manager.py @@ -6,7 +6,7 @@ from metrics.utils.permission_hierarchy import convert_permission_set_into_hierarchy -if TYPE_CHECKING: +if TYPE_CHECKING: # just for IDE checks from rest_framework.request import Request logger = logging.getLogger(__name__) @@ -16,8 +16,7 @@ class CognitoManager(BaseUserManager): @staticmethod def get_or_create_for_cognito(jwt_payload): - """ - Create an ephemeral user instance for this request. + """Create an ephemeral user instance for this request. We don't need to store or retrieve any info, we use what's in the JWT, so this speeds up the request by removing the need for any DB access """ @@ -26,14 +25,14 @@ def get_or_create_for_cognito(jwt_payload): username = jwt_payload["entraObjectId"] raw_permission_sets = jwt_payload["permissionSets"] - # Manual testing + # Manual testing (just for now) # username = "678a605b-16f3-4342-9f02-db74613701ac" # raw_permission_sets = { # "permission_sets": [ # { # "theme": {"id": "100", "name": "immunisation"}, - # "sub_theme": {"id": "133", "name": "childhood-vaccines"}, - # "topic": {"id": "-1", "name": None}, + # "sub_theme": {"id": "200", "name": "childhood-vaccines"}, + # "topic": {"id": "-1", "name": "* (All)"}, # } # ], # "summary": {"has_global_access": False}, @@ -52,9 +51,7 @@ def get_or_create_for_cognito(jwt_payload): logger.info( "JWT token for user '%s' with permissions: permission_count=%d, has_global_access=%s", - username, - permission_count, - has_global_access, + username, permission_count, has_global_access, ) user_class = get_user_model() @@ -72,10 +69,10 @@ def extract_jwt_permissions(*, request: "Request | None") -> dict: during JWT authentication. Lives here because it is the counterpart to the code above that "writes" user.permission_sets. - @param {Request | None} request example: + @param {Request | None} request, eg: - @return {dict} example: + @return {dict}, eg: { "permission_set_hierarchy": [ {"theme": {"id": "100", "name": "immunisation"}, ...} diff --git a/metrics/data/managers/core_models/headline.py b/metrics/data/managers/core_models/headline.py index ba02a212a..14ecd01c5 100644 --- a/metrics/data/managers/core_models/headline.py +++ b/metrics/data/managers/core_models/headline.py @@ -16,7 +16,7 @@ validate_permissions_for_non_public, ) from metrics.utils.permissions import ( - check_if_any_permissions_allow_access, + check_any_permissions_allow_access, ) @@ -368,7 +368,7 @@ def query_for_data( Note that options are `M`, `F`, or `ALL`. age: The age range to apply additional filtering to. E.g. `0_4` would be used to capture the age of 0-4 years old - theme: The name of the theme being queried. + theme: The name of the theme being queried. This is only used to determine permissions for the non-public portion of the requested dataset. sub_theme: The name of the sub theme being queried. @@ -377,9 +377,10 @@ def query_for_data( rbac_permissions: The RBAC permissions available to the given request. This dictates whether the given request is permitted access to non-public data or not. - jwt_permissions: JWT permissions dict extracted from Cognito token. - Contains 'has_global_access' (bool) and 'permission_set_hierarchy' (list). - Used for new JWT-based authorization (takes precedence over RBAC permissions). + The new JWT-based authorization below takes precedence + over RBAC permissions, which is not in use anymore. + jwt_permissions: The JWT permissions extracted from the Cognito token. + Contains 'permission_set_hierarchy' (list) and 'has_global_access' (bool). Returns: Queryset of (x_axis, y_axis) where x_axis represents the variable on the x_axis @@ -391,19 +392,17 @@ def query_for_data( """ rbac_permissions = rbac_permissions or [] - jwt_permissions = jwt_permissions or {} - # Only allow access, if permissions checks below are passed - has_access_to_non_public_data: bool = False + has_access_to_non_public_data: bool - # Check JWT permissions first (new authorization takes precedence) if jwt_permissions: + # Check JWT permissions first (new authorization takes precedence) has_global_access = jwt_permissions.get("has_global_access", False) if has_global_access: has_access_to_non_public_data = True else: - has_access_to_non_public_data = check_if_any_permissions_allow_access( + has_access_to_non_public_data = check_any_permissions_allow_access( jwt_permissions=jwt_permissions, theme=theme, sub_theme=sub_theme, @@ -413,7 +412,7 @@ def query_for_data( geography=geography, ) else: - # THESE LEGACY RBAC PERMISSIONS ARE NOT IN USE AND TO BE REMOVED IN A FUTURE RELEASE + # Legacy RBAC permissions (not in use) (to be removed in a future release) has_access_to_non_public_data = validate_permissions_for_non_public( theme=theme, sub_theme=sub_theme, diff --git a/metrics/data/managers/core_models/time_series.py b/metrics/data/managers/core_models/time_series.py index d9c089111..2578429f6 100644 --- a/metrics/data/managers/core_models/time_series.py +++ b/metrics/data/managers/core_models/time_series.py @@ -19,7 +19,7 @@ ) from metrics.data.models import RBACPermission from metrics.utils.permissions import ( - check_if_any_permissions_allow_access, + check_any_permissions_allow_access, ) ALLOWABLE_METRIC_VALUE_RANGE_TYPE = tuple[str | float | int, str | float | int] @@ -620,19 +620,17 @@ def query_for_data( """ rbac_permissions: Iterable[RBACPermission] = rbac_permissions or [] - jwt_permissions = jwt_permissions or {} - # Only allow access, if permissions checks below are passed - has_access_to_non_public_data: bool = False + has_access_to_non_public_data: bool - # Check JWT permissions first (new authorization takes precedence) if jwt_permissions: + # Check JWT permissions first (new authorization takes precedence) has_global_access = jwt_permissions.get("has_global_access", False) if has_global_access: has_access_to_non_public_data = True else: - has_access_to_non_public_data = check_if_any_permissions_allow_access( + has_access_to_non_public_data = check_any_permissions_allow_access( jwt_permissions=jwt_permissions, theme=theme, sub_theme=sub_theme, @@ -642,7 +640,7 @@ def query_for_data( geography=geography, ) else: - # THESE LEGACY RBAC PERMISSIONS ARE NOT IN USE AND TO BE REMOVED IN A FUTURE RELEASE + # Legacy RBAC permissions (not in use) (to be removed in a future release) has_access_to_non_public_data = validate_permissions_for_non_public( theme=theme, sub_theme=sub_theme, diff --git a/metrics/utils/permission_hierarchy.py b/metrics/utils/permission_hierarchy.py index 895798ff5..cefed1e78 100644 --- a/metrics/utils/permission_hierarchy.py +++ b/metrics/utils/permission_hierarchy.py @@ -26,13 +26,13 @@ def convert_permission_set_into_hierarchy(raw_permission_sets: dict) -> dict: Convert a "permission_set" back into a "permission_set_hierarchy" again (the NormalizedPermission class does it the other way round) - @param {dict} raw_permission_sets example: + @param {dict} raw_permission_sets, eg: { "permission_sets": [ { "theme": {"id": "100", "name": "immunisation"}, - "sub_theme": {"id": "133", "name": "childhood-vaccines"}, - "topic": {"id": "-1", "name": "* (All)"}, + "sub_theme": {"id": "200", "name": "childhood-vaccines"}, + "topic": {"id": "215", "name": "MMR1"}, } ], "summary": { @@ -40,13 +40,13 @@ def convert_permission_set_into_hierarchy(raw_permission_sets: dict) -> dict: }, } - @return {dict} example: + @return {dict}, eg: { "permission_set_hierarchy": [ { "theme": {"id": "100", "name": "immunisation"}, - "sub_theme": {"id": "133", "name": "childhood-vaccines"}, - "topic": {"id": "-1", "name": "* (All)"}, + "sub_theme": {"id": "200", "name": "childhood-vaccines"}, + "topic": {"id": "215", "name": "MMR1"}, } ], "has_global_access": False, diff --git a/metrics/utils/permissions.py b/metrics/utils/permissions.py index 34874a8e8..6c5329dd0 100644 --- a/metrics/utils/permissions.py +++ b/metrics/utils/permissions.py @@ -2,7 +2,6 @@ Non-public permission validation, filtering and matching functions """ -import logging from typing import Literal, NotRequired, TypedDict from metrics.data.models.core_models.supporting import ( @@ -12,8 +11,6 @@ Topic, ) -logger = logging.getLogger(__name__) - # Our permission resources, e.g. anything that you can filter by permission PermissionFilterResource = Literal[ "theme", @@ -31,7 +28,7 @@ class PermissionSetLevel(TypedDict): The "name" is just some blurb describing it. """ - id: str # can be "-1", therefore a string + id: str # can be "-1" and therefore a string name: str @@ -49,9 +46,9 @@ class PermissionHierarchy(TypedDict): geography: NotRequired[PermissionSetLevel] -def check_if_any_permissions_allow_access( +def check_any_permissions_allow_access( *, - jwt_permissions: dict | None, + jwt_permissions: dict, theme: str = "", sub_theme: str = "", topic: str, @@ -66,20 +63,30 @@ def check_if_any_permissions_allow_access( of the passed API filters are being satisfied through the user's permissions defined in the jwt_permissions. - Any of our 6 passed permission fields is optional. If - none of them is set, no permission is given. + Any of the 6 of the passed API filters are optional. If + none of them is passed, return false, cause at this point + we already know user has_global_access=False - @param {dict | None} jwt_permissions, eg: + @param {dict} jwt_permissions, eg: { "permission_set_hierarchy": [ { "theme": {"id": "100", "name": "immunisation"}, - "sub_theme": {"id": "133", "name": "childhood-vaccines"}, + "sub_theme": {"id": "200", "name": "childhood-vaccines"}, "topic": {"id": "-1", "name": "* (All)"}, } ] } + @param {str} theme, eg: + "immunisation" + + @param {str} sub_theme, eg: + "childhood-vaccines" + + @param {str} topic, eg: + "MMR1" + @return {bool} """ @@ -132,7 +139,7 @@ def check_if_any_permissions_allow_access( normalized_requested_filters: dict[PermissionFilterResource, str] = { key: str(value) for key, value in requested_filters.items() - if _permission_value_is_valid(value) + if _is_permission_value_valid(value) } matching_permissions = filter_permissions( @@ -145,39 +152,38 @@ def check_if_any_permissions_allow_access( def filter_permissions( *, - jwt_permissions: dict | None, + jwt_permissions: dict, requested_filters: dict[PermissionFilterResource, str], ) -> list[dict[str, str]]: """ Filter permissions that match the requested IDs. Returns a list of matching permissions containing IDs (without wildcards "-1"). - @param {dict | None} jwt_permissions, eg: + @param {dict} jwt_permissions, eg: { "permission_set_hierarchy": [ { "theme": {"id": "100", "name": "immunisation"}, - "sub_theme": {"id": "133", "name": "childhood-vaccines"}, - "topic": {"id": "215", "name": "MMR1"}, + "sub_theme": {"id": "200", "name": "childhood-vaccines"}, + "topic": {"id": "-1", "name": "* (All)"}, } ] } @param {dict} requested_filters, eg: - {"theme": "100", "sub_theme": "133", "topic": "114"} + {"theme": "100", "sub_theme": "200", "topic": "300"} @return {list}, eg: - [{"theme": "100", "sub_theme": "133"}] + [{"theme": "100", "sub_theme": "200"}] """ - if not isinstance( - permission_set_hierarchy := (jwt_permissions or {}).get( - "permission_set_hierarchy" - ), - list, - ): + # Gotta have both of them to match permissions, cause at + # this point we already know user has_global_access=False + permission_set_hierarchy = jwt_permissions.get( + "permission_set_hierarchy" + ) + if not isinstance(permission_set_hierarchy, list): return [] - if not requested_filters: return [] @@ -189,7 +195,7 @@ def filter_permissions( concrete_filters: dict[str, str] = {} - # Permission row must satisfy all requested dimensions, not just some + # All the requested filters must be present in the permission row or "-1" for filter_key, requested_value in requested_filters.items(): permission_value = _extract_permission_id( permission=permission, @@ -197,25 +203,27 @@ def filter_permissions( ) if permission_value == requested_value: - # Exact permission match also grants access on this dimension. + # Exact permission match grants access concrete_filters[filter_key] = requested_value continue - if permission_value == "-1": - # Wildcard grants access on this dimension. + elif permission_value == "-1": + # Wildcard also grants access concrete_filters[filter_key] = requested_value continue - # This permission row did not satisfy all requested filters + # This permission row did not satisfy all the requested filters break else: - # Only runs if loop didn't break -> full match. + # Only runs if loop didn't break -> full match matching_permissions.append(concrete_filters) return matching_permissions def _extract_permission_id( - *, permission: dict[str, object], filter_key: PermissionFilterResource + *, + permission: dict[str, object], + filter_key: PermissionFilterResource ) -> str | None: """ Extract a permission id for one filter key from a permission entry. @@ -223,7 +231,7 @@ def _extract_permission_id( @param {dict} permission, eg: { "theme": {"id": "100", "name": "immunisation"}, - "sub_theme": {"id": "133", "name": "childhood-vaccines"}, + "sub_theme": {"id": "200", "name": "childhood-vaccines"}, "topic": {"id": "-1", "name": "* (All)"}, } @@ -231,14 +239,14 @@ def _extract_permission_id( "sub_theme" @return {str | None}, eg: - "133" + "200" """ # Do we offer permissions on this resource? if filter_key not in permission: return None - # Is permission resource present? + # Permission resource present? permission_resource = permission.get(filter_key) if not permission_resource: return None @@ -247,16 +255,16 @@ def _extract_permission_id( if not isinstance(permission_resource, dict): return None - # Does it have a meaningful permission value? + # Permission value meaningful? permission_id = permission_resource.get("id") - if not _permission_value_is_valid(permission_id): + if not _is_permission_value_valid(permission_id): return None - # Return ID, if all is fine until here + # If all is fine up to here, return the ID return str(permission_id) -def _permission_value_is_valid(value: object) -> bool: +def _is_permission_value_valid(value: object) -> bool: """ Is value a valid permission ID? @@ -264,6 +272,7 @@ def _permission_value_is_valid(value: object) -> bool: Integers that exist as string types are fine, eg "5" instead of 5. The wildcard "-1" remains also valid. """ + if value is None: return False @@ -281,9 +290,3 @@ def _permission_value_is_valid(value: object) -> bool: return False return numeric_value != 0 - - -class InvalidPermissionHierarchyError(ValueError): - """ - Raised when a JWT permission hierarchy contains invalid values - """