From 8708ada9dacf38df76b2dd2cd93953e46b683fa3 Mon Sep 17 00:00:00 2001 From: Dan Dammann Date: Wed, 27 May 2026 17:11:20 +0100 Subject: [PATCH 1/6] CDD-3173: prototype authorization curl call on /api/downloads/v2 --- cms/auth_content/auth_utils.py | 74 ++++++++++++++++++- cms/auth_content/constants.py | 2 + metrics/api/middleware/__init__.py | 0 metrics/api/middleware/sql_debug.py | 24 ++++++ metrics/api/settings/local.py | 1 + .../data/managers/core_models/time_series.py | 56 +++++++++----- metrics/domain/models/charts/common.py | 8 ++ metrics/interfaces/plots/access.py | 6 +- 8 files changed, 151 insertions(+), 20 deletions(-) create mode 100644 metrics/api/middleware/__init__.py create mode 100644 metrics/api/middleware/sql_debug.py diff --git a/cms/auth_content/auth_utils.py b/cms/auth_content/auth_utils.py index 56735c2da..ff2bb2491 100644 --- a/cms/auth_content/auth_utils.py +++ b/cms/auth_content/auth_utils.py @@ -1,9 +1,81 @@ +import logging from collections.abc import Callable - +# from cms.auth_content.constants import WILDCARD_ID_VALUE, WILDCARD_NAME_VALUES from django import forms from cms.dynamic_content import help_texts +WILDCARD_ID_VALUE = "-1" +WILDCARD_NAME_VALUES = ["* (All themes)", "* (All sub-themes)", "* (All topics)"] + +logger = logging.getLogger(__name__) + +def check_permissions(user_permissions, theme_id, sub_theme_id, topic_id) -> bool: + if not isinstance(user_permissions, list): + return False + + for permission in user_permissions: + permission_theme_id = permission.get("theme", {}).get("id") + permission_sub_theme_id = permission.get("sub_theme", {}).get("id") + permission_topic_id = permission.get("topic", {}).get("id") + + if permission_theme_id == WILDCARD_ID_VALUE: + return True + + if ( + permission_theme_id == theme_id + and permission_sub_theme_id == WILDCARD_ID_VALUE + ): + return True + + if ( + permission_theme_id == theme_id + and permission_sub_theme_id == sub_theme_id + and (permission_topic_id in {WILDCARD_ID_VALUE, topic_id}) + ): + return True + + return False + + +def check_permissions_by_name(permission_sets, theme_name, sub_theme_name, topic_name) -> bool: + if not isinstance(permission_sets, dict): + return False + if not isinstance(permission_sets.get("permission_sets"), list): + return False + if not isinstance(permission_sets.get("summary"), dict): + return False + if not isinstance(permission_sets.get("summary").get("has_global_access"), bool): + return False + + logger.info(f'Entered check_permissions_by_name() with theme "{theme_name}" and sub_theme "{sub_theme_name}" and topic "{topic_name}"') + + if permission_sets.get("summary").get("has_global_access"): + return True + else: + for permission in permission_sets.get("permission_sets"): + permission_theme_name = permission.get("theme", {}).get("name") + permission_sub_theme_name = permission.get("sub_theme", {}).get("name") + permission_topic_name = permission.get("topic", {}).get("name") + + if permission_theme_name in WILDCARD_NAME_VALUES: + return True + + if ( + permission_theme_name == theme_name + and permission_sub_theme_name in WILDCARD_NAME_VALUES + ): + return True + + if ( + permission_theme_name == theme_name + and permission_sub_theme_name == sub_theme_name + and (permission_topic_name in WILDCARD_NAME_VALUES + [topic_name]) + ): + return True + + return False + def _create_form_field( field: dict[str, str | Callable | None], wildcard_id_value=None diff --git a/cms/auth_content/constants.py b/cms/auth_content/constants.py index 0920faa1f..9c04bcf3d 100644 --- a/cms/auth_content/constants.py +++ b/cms/auth_content/constants.py @@ -4,6 +4,8 @@ ) WILDCARD_ID_VALUE = "-1" +WILDCARD_NAME_VALUES = ["* (All themes)", "* (All sub-themes)", "* (All topics)"] + PERMISSION_SET_FIELDS = [ { "field_name": "theme", diff --git a/metrics/api/middleware/__init__.py b/metrics/api/middleware/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/metrics/api/middleware/sql_debug.py b/metrics/api/middleware/sql_debug.py new file mode 100644 index 000000000..dc00d0762 --- /dev/null +++ b/metrics/api/middleware/sql_debug.py @@ -0,0 +1,24 @@ +from django.db import connection + + +def _print_sql(execute, sql, params, many, context): + print(f"\n[SQL] {sql}") + if params: + print(f"[PARAMS] {params}") + return execute(sql, params, many, context) + + +class SQLDebugMiddleware: + """ + Middleware that prints the raw SQL and params for every DB query made + during a request/response cycle. + + Only intended for local development — add to MIDDLEWARE in local.py. + """ + + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + with connection.execute_wrapper(_print_sql): + return self.get_response(request) diff --git a/metrics/api/settings/local.py b/metrics/api/settings/local.py index 8d0cf9636..2a3f10dfa 100644 --- a/metrics/api/settings/local.py +++ b/metrics/api/settings/local.py @@ -29,6 +29,7 @@ MIDDLEWARE += [ "debug_toolbar.middleware.DebugToolbarMiddleware", + "metrics.api.middleware.sql_debug.SQLDebugMiddleware", ] INTERNAL_IPS = ["127.0.0.1"] diff --git a/metrics/data/managers/core_models/time_series.py b/metrics/data/managers/core_models/time_series.py index aed8df7f6..0f948b965 100644 --- a/metrics/data/managers/core_models/time_series.py +++ b/metrics/data/managers/core_models/time_series.py @@ -6,9 +6,10 @@ """ import datetime +import logging from collections.abc import Iterable from typing import Self - +from cms.auth_content.auth_utils import check_permissions_by_name from django.db import models from django.db.models.query_utils import Q from django.utils import timezone @@ -21,6 +22,7 @@ ALLOWABLE_METRIC_VALUE_RANGE_TYPE = tuple[str | float | int, str | float | int] +logger = logging.getLogger(__name__) class CoreTimeSeriesQuerySet(models.QuerySet): """Custom queryset which can be used by the `CoreTimeSeriesManager`""" @@ -160,6 +162,8 @@ def filter_for_audit_list_view( def query_for_data( self, *, + theme: str, + sub_theme: str, topic: str, metric: str, date_from: datetime.date, @@ -172,7 +176,7 @@ def query_for_data( sex: str | None = None, age: str | None = None, metric_value_ranges: list[tuple[str | float | int]] | None = None, - restrict_to_public: bool = True, + permission_sets: dict, ) -> models.QuerySet: """Filters for a N-item list of dicts by the given params if `fields_to_export` is used. @@ -217,9 +221,7 @@ def query_for_data( i.e. to filter for all record with values between 0 -> 80 AND 90 -> 100, this can be provided as `[(0, 80), (90, 100)]`. - restrict_to_public: Boolean switch to restrict the query - to only return public records. - If False, then non-public records will be included. + permission_sets: The JWT permissions extracted from the Cognito token. Returns: QuerySet: An ordered queryset from lowest -> highest @@ -231,6 +233,8 @@ def query_for_data( ]>` """ + logger.info('Entered query_for_data()') + queryset = self.filter( metric__topic__name=topic, metric__name=metric, @@ -245,9 +249,31 @@ def query_for_data( sex=sex, age=age, ) + public_queryset = queryset.filter( + is_public=True + ) - if restrict_to_public: - queryset = queryset.filter(is_public=True) + if permission_sets: + logger.info('Entered if permission_sets clause') + + if check_permissions_by_name( + permission_sets, + theme, + sub_theme, + topic, + # metric, + # geography_type, + # geography, + ): + logger.info('Entered check_permissions_by_name() if clause') + + queryset = public_queryset + queryset.filter( + is_public=False + ) + else: + logger.info('Entered else permission_sets clause') + + queryset = public_queryset queryset = self._exclude_data_under_embargo(queryset=queryset) queryset = self._filter_for_metric_value_ranges( @@ -533,6 +559,7 @@ def query_for_data( sub_theme: str = "", metric_value_ranges: list[str | float | int] | None = None, rbac_permissions: Iterable[RBACPermission] | None = None, + permission_sets: dict, ) -> CoreTimeSeriesQuerySet: """Filters for a 2-item object by the given params. Slices all values older than the `date_from`. @@ -582,6 +609,7 @@ def query_for_data( rbac_permissions: The RBAC permissions available to the given request. This dictates whether the given request is permitted access to non-public data or not. + permission_sets: The JWT permissions extracted from the Cognito token. Notes: If we have the following input `queryset`: @@ -611,20 +639,12 @@ def query_for_data( ]>` """ - rbac_permissions: Iterable[RBACPermission] = rbac_permissions or [] - has_access_to_non_public_data: bool = validate_permissions_for_non_public( - theme=theme, - sub_theme=sub_theme, - topic=topic, - metric=metric, - geography_type=geography_type, - geography=geography, - rbac_permissions=rbac_permissions, - ) return self.get_queryset().query_for_data( fields_to_export=fields_to_export, field_to_order_by=field_to_order_by, + theme=theme, + sub_theme=sub_theme, topic=topic, metric=metric, date_from=date_from, @@ -635,7 +655,7 @@ def query_for_data( sex=sex, age=age, metric_value_ranges=metric_value_ranges, - restrict_to_public=not has_access_to_non_public_data, + permission_sets=permission_sets, ) def query_for_superseded_data( diff --git a/metrics/domain/models/charts/common.py b/metrics/domain/models/charts/common.py index 317301450..faaa30a0c 100644 --- a/metrics/domain/models/charts/common.py +++ b/metrics/domain/models/charts/common.py @@ -1,3 +1,4 @@ +import logging from collections.abc import Iterable from decimal import Decimal from typing import Literal @@ -5,6 +6,7 @@ from pydantic.main import BaseModel from rest_framework.request import Request +logger = logging.getLogger(__name__) class BaseChartRequestParams(BaseModel): file_format: Literal["png", "svg", "jpg", "jpeg", "json", "csv"] @@ -27,3 +29,9 @@ class Config: @property def rbac_permissions(self) -> Iterable["RBACPermission"]: return getattr(self.request, "rbac_permissions", []) + + @property + def permission_sets(self) -> dict: + logger.info(f'Entered BaseChartRequestParams.permission_sets') + + return getattr(self.request.user, "permission_sets", {}) diff --git a/metrics/interfaces/plots/access.py b/metrics/interfaces/plots/access.py index 6e4eac34d..efad04938 100644 --- a/metrics/interfaces/plots/access.py +++ b/metrics/interfaces/plots/access.py @@ -161,8 +161,12 @@ def get_queryset_from_core_model_manager( plot_params["fields_to_export"].append("upper_confidence") plot_params["fields_to_export"].append("lower_confidence") + logger.info('Entered access.py') + return self.core_model_manager.query_for_data( - **plot_params, rbac_permissions=self.chart_request_params.rbac_permissions + **plot_params, + rbac_permissions=self.chart_request_params.rbac_permissions, + permission_sets = self.chart_request_params.permission_sets, ) def build_plot_data_from_parameters_with_complete_queryset( From e98220f1a4774fdd6adf9eb06bf6d44c05876cf0 Mon Sep 17 00:00:00 2001 From: Dan Dammann Date: Thu, 28 May 2026 17:51:23 +0100 Subject: [PATCH 2/6] CDD-3173: get rid of check_permissions_by_name() and make /api/downloads/v2 use check_permissions() instead --- cms/auth_content/auth_utils.py | 144 +++++++++++++----- cms/auth_content/constants.py | 2 - .../data/managers/core_models/geography.py | 30 ++++ .../managers/core_models/geography_type.py | 30 ++++ metrics/data/managers/core_models/metric.py | 30 ++++ .../data/managers/core_models/time_series.py | 20 ++- metrics/data/managers/core_models/topic.py | 50 ++++++ metrics/domain/models/charts/common.py | 12 +- metrics/interfaces/plots/access.py | 4 +- 9 files changed, 273 insertions(+), 49 deletions(-) diff --git a/cms/auth_content/auth_utils.py b/cms/auth_content/auth_utils.py index ff2bb2491..c724e78a3 100644 --- a/cms/auth_content/auth_utils.py +++ b/cms/auth_content/auth_utils.py @@ -1,24 +1,41 @@ import logging from collections.abc import Callable -# from cms.auth_content.constants import WILDCARD_ID_VALUE, WILDCARD_NAME_VALUES -from django import forms +from cms.auth_content.constants import WILDCARD_ID_VALUE +from django import forms from cms.dynamic_content import help_texts - -WILDCARD_ID_VALUE = "-1" -WILDCARD_NAME_VALUES = ["* (All themes)", "* (All sub-themes)", "* (All topics)"] +from metrics.data.models.core_models.supporting import Geography, GeographyType, Metric, Topic logger = logging.getLogger(__name__) -def check_permissions(user_permissions, theme_id, sub_theme_id, topic_id) -> bool: - if not isinstance(user_permissions, list): +def check_permissions( + permission_sets, + theme_id, + sub_theme_id, + topic_id, + metric_id=0, + geography_type=0, + geography_id=0, +) -> bool: + if not isinstance(permission_sets, list): return False - for permission in user_permissions: + # TODO: Can we make them all ID at source? + theme_id = str(theme_id) + sub_theme_id = str(sub_theme_id) + topic_id = str(topic_id) + metric_id = str(metric_id) + geography_type = str(geography_type) + geography_id = str(geography_id) + + for permission in permission_sets: permission_theme_id = permission.get("theme", {}).get("id") permission_sub_theme_id = permission.get("sub_theme", {}).get("id") permission_topic_id = permission.get("topic", {}).get("id") - + permission_metric_id = permission.get("metric", {}).get("id") + permission_geography_type = permission.get("geography_type", {}).get("id") + permission_geography_id = permission.get("geography", {}).get("id") + if permission_theme_id == WILDCARD_ID_VALUE: return True @@ -31,14 +48,86 @@ def check_permissions(user_permissions, theme_id, sub_theme_id, topic_id) -> boo if ( permission_theme_id == theme_id and permission_sub_theme_id == sub_theme_id - and (permission_topic_id in {WILDCARD_ID_VALUE, topic_id}) + and permission_topic_id in {WILDCARD_ID_VALUE, topic_id} + ): + return True + + if ( + permission_theme_id == theme_id + and permission_sub_theme_id == sub_theme_id + and permission_topic_id == topic_id + and permission_metric_id in {WILDCARD_ID_VALUE, metric_id} + ): + return True + + if ( + permission_theme_id == theme_id + and permission_sub_theme_id == sub_theme_id + and permission_topic_id == topic_id + and permission_metric_id == metric_id + and permission_geography_type in {WILDCARD_ID_VALUE, geography_type} ): return True + if ( + permission_theme_id == theme_id + and permission_sub_theme_id == sub_theme_id + and permission_topic_id == topic_id + and permission_metric_id == metric_id + and permission_geography_type == geography_type + and permission_geography_id in {WILDCARD_ID_VALUE, geography_id} + ): + return True + return False -def check_permissions_by_name(permission_sets, theme_name, sub_theme_name, topic_name) -> bool: + +def check_permissions_by_name( + permission_sets, + theme_name, + sub_theme_name, + topic_name, + metric_name, + geography_type, + geography_name, +) -> bool: + logger.info(f'Entered check_permissions_by_name()') + + theme_id, sub_theme_id, topic_id = Topic.objects.get_id_by_name(theme_name, sub_theme_name, topic_name) + metric_id = Metric.objects.get_id_by_name(metric_name) + geography_type_id = GeographyType.objects.get_id_by_name(geography_type) + geography_id = Geography.objects.get_id_by_name(geography_name) + + # In case a NAME doesn't have an ID + if any( + value == -2 + for value in (theme_id, sub_theme_id, topic_id, metric_id, geography_type_id, geography_id) + ): + return False + + return check_permission_set( + permission_sets, + theme_id, + sub_theme_id, + topic_id, + metric_id, + geography_type_id, + geography_id, + ) + + +def check_permission_set( + permission_sets, + theme_id, + sub_theme_id, + topic_id, + metric_id, + geography_type, + geography_id, +) -> bool: + logger.info(f'Entered check_permission_set()') + if not isinstance(permission_sets, dict): return False if not isinstance(permission_sets.get("permission_sets"), list): @@ -48,33 +137,18 @@ def check_permissions_by_name(permission_sets, theme_name, sub_theme_name, topic if not isinstance(permission_sets.get("summary").get("has_global_access"), bool): return False - logger.info(f'Entered check_permissions_by_name() with theme "{theme_name}" and sub_theme "{sub_theme_name}" and topic "{topic_name}"') - if permission_sets.get("summary").get("has_global_access"): return True else: - for permission in permission_sets.get("permission_sets"): - permission_theme_name = permission.get("theme", {}).get("name") - permission_sub_theme_name = permission.get("sub_theme", {}).get("name") - permission_topic_name = permission.get("topic", {}).get("name") - - if permission_theme_name in WILDCARD_NAME_VALUES: - return True - - if ( - permission_theme_name == theme_name - and permission_sub_theme_name in WILDCARD_NAME_VALUES - ): - return True - - if ( - permission_theme_name == theme_name - and permission_sub_theme_name == sub_theme_name - and (permission_topic_name in WILDCARD_NAME_VALUES + [topic_name]) - ): - return True - - return False + return check_permissions( + permission_sets.get("permission_sets"), + theme_id, + sub_theme_id, + topic_id, + metric_id, + geography_type, + geography_id, + ) def _create_form_field( diff --git a/cms/auth_content/constants.py b/cms/auth_content/constants.py index 9c04bcf3d..0920faa1f 100644 --- a/cms/auth_content/constants.py +++ b/cms/auth_content/constants.py @@ -4,8 +4,6 @@ ) WILDCARD_ID_VALUE = "-1" -WILDCARD_NAME_VALUES = ["* (All themes)", "* (All sub-themes)", "* (All topics)"] - PERMISSION_SET_FIELDS = [ { "field_name": "theme", diff --git a/metrics/data/managers/core_models/geography.py b/metrics/data/managers/core_models/geography.py index 5ff2ce8cb..046721983 100644 --- a/metrics/data/managers/core_models/geography.py +++ b/metrics/data/managers/core_models/geography.py @@ -47,6 +47,19 @@ def get_name_by_code(self, geography_code: str) -> str | None: .first() ) + def get_id_by_name(self, geography_name: str) -> int: + """ + Gets the geography ID for a given geography name. + + Args: + geography_name: The name of the geography to look up + + Returns: + The geography ID if found, or -2 otherwise + """ + record = self.filter(name=geography_name).first() + return int(record.id) if record else -2 + def get_all_geography_codes_by_geography_type( self, geography_type_name: str ) -> Self: @@ -167,6 +180,23 @@ def get_name_by_code(self, geography_code: int) -> str | None: """ return self.get_queryset().get_name_by_code(geography_code) + def get_id_by_name(self, geography_name: str) -> int: + """Gets the geography ID which matches the given geography name. + + Args: + geography_name: The name of the geography to look up + + Returns: + The geography ID if found, -2 otherwise + + Examples: + >>> GeographyManager.get_id_by_name("England") + 6 + >>> GeographyManager.get_id_by_name("Unknown geography") + -2 + """ + return self.get_queryset().get_id_by_name(geography_name) + def get_all_names(self) -> GeographyQuerySet: """Gets all available deduplicated geography names as a flat list queryset. diff --git a/metrics/data/managers/core_models/geography_type.py b/metrics/data/managers/core_models/geography_type.py index ae45b36b7..df85ccd5b 100644 --- a/metrics/data/managers/core_models/geography_type.py +++ b/metrics/data/managers/core_models/geography_type.py @@ -41,6 +41,19 @@ def get_name_by_id(self, geography_type_id: int) -> str | None: """ return self.filter(id=geography_type_id).values_list("name", flat=True).first() + def get_id_by_name(self, geography_type_name: str) -> int: + """ + Gets the geography type ID for a given geography type name. + + Args: + geography_type_name: The name of the geography type to look up + + Returns: + The geography type ID if found, or -2 otherwise + """ + record = self.filter(name=geography_type_name).first() + return int(record.id) if record else -2 + def get_all_names_and_ids(self) -> models.QuerySet: """Gets all available geography_type names as a flat list queryset. @@ -77,6 +90,23 @@ def get_name_by_id(self, geography_type_id: int) -> str | None: """ return self.get_queryset().get_name_by_id(geography_type_id) + def get_id_by_name(self, geography_type_name: str) -> int: + """Gets the geography type ID which matches the given geography type name. + + Args: + geography_type_name: The name of the geography type to look up + + Returns: + The geography type ID if found, -2 otherwise + + Examples: + >>> GeographyTypeManager.get_id_by_name("Nation") + 5 + >>> GeographyTypeManager.get_id_by_name("Unknown type") + -2 + """ + return self.get_queryset().get_id_by_name(geography_type_name) + def get_all_names(self) -> GeographyTypeQuerySet: """Gets all available geography_type names as a flat list queryset. diff --git a/metrics/data/managers/core_models/metric.py b/metrics/data/managers/core_models/metric.py index e9fcb961d..66a3ec60c 100644 --- a/metrics/data/managers/core_models/metric.py +++ b/metrics/data/managers/core_models/metric.py @@ -29,6 +29,19 @@ def get_name_by_id(self, metric_id: int) -> str | None: """ return self.filter(id=metric_id).values_list("name", flat=True).first() + def get_id_by_name(self, metric_name: str) -> int: + """ + Gets the metric ID for a given metric name. + + Args: + metric_name: The name of the metric to look up + + Returns: + The metric ID if found, or -2 otherwise + """ + record = self.filter(name=metric_name).first() + return int(record.id) if record else -2 + def get_all_names(self) -> models.QuerySet: """Gets all available metric names as a flat list queryset. @@ -146,6 +159,23 @@ def get_name_by_id(self, metric_id: int) -> str | None: """ return self.get_queryset().get_name_by_id(metric_id) + def get_id_by_name(self, metric_name: str) -> int: + """Gets the metric ID which matches the given metric name. + + Args: + metric_name: The name of the metric to look up + + Returns: + The metric ID if found, -2 otherwise + + Examples: + >>> MetricManager.get_id_by_name("COVID-19_cases_countRollingMean") + 4 + >>> MetricManager.get_id_by_name("Unknown metric") + -2 + """ + return self.get_queryset().get_id_by_name(metric_name) + def get_all_names(self) -> MetricQuerySet: """Gets all available metric names as a flat list queryset. diff --git a/metrics/data/managers/core_models/time_series.py b/metrics/data/managers/core_models/time_series.py index 0f948b965..8612e6462 100644 --- a/metrics/data/managers/core_models/time_series.py +++ b/metrics/data/managers/core_models/time_series.py @@ -9,7 +9,6 @@ import logging from collections.abc import Iterable from typing import Self -from cms.auth_content.auth_utils import check_permissions_by_name from django.db import models from django.db.models.query_utils import Q from django.utils import timezone @@ -162,8 +161,6 @@ def filter_for_audit_list_view( def query_for_data( self, *, - theme: str, - sub_theme: str, topic: str, metric: str, date_from: datetime.date, @@ -175,6 +172,8 @@ def query_for_data( stratum: str | None = None, sex: str | None = None, age: str | None = None, + theme: str, + sub_theme: str, metric_value_ranges: list[tuple[str | float | int]] | None = None, permission_sets: dict, ) -> models.QuerySet: @@ -216,6 +215,12 @@ def query_for_data( Note that options are `M`, `F`, or `ALL`. age: The age range to apply additional filtering to. E.g. `0_4` would be used to capture the age of 0-4 years old + theme: The name of the theme being queried. + This is only used to determine permissions for + the non-public portion of the requested dataset. + sub_theme: The name of the sub theme being queried. + This is only used to determine permissions for + the non-public portion of the requested dataset. metric_value_ranges: List of tuples whereby each tuple represents a permissible metric value range. i.e. to filter for all record with values @@ -256,14 +261,17 @@ def query_for_data( if permission_sets: logger.info('Entered if permission_sets clause') + # TODO: Workaround cos circular import error when at the top of the file + from cms.auth_content.auth_utils import check_permissions_by_name + if check_permissions_by_name( permission_sets, theme, sub_theme, topic, - # metric, - # geography_type, - # geography, + metric, + geography_type, + geography, ): logger.info('Entered check_permissions_by_name() if clause') diff --git a/metrics/data/managers/core_models/topic.py b/metrics/data/managers/core_models/topic.py index 00b6f0632..fb654334f 100644 --- a/metrics/data/managers/core_models/topic.py +++ b/metrics/data/managers/core_models/topic.py @@ -40,6 +40,34 @@ def get_name_by_id(self, topic_id: int) -> str | None: """ return self.filter(id=topic_id).values_list("name", flat=True).first() + def get_id_by_name( + self, theme_name: str, sub_theme_name: str, topic_name: str + ) -> tuple[int, int, int]: + """ + Gets the theme, sub-theme and topic IDs matching the given names. + + Args: + theme_name: The name of the parent theme + sub_theme_name: The name of the parent sub-theme + topic_name: The name of the topic to look up + + Returns: + A tuple of (theme_id, sub_theme_id, topic_id) if found, + or the tuple (-2, -2, -2) otherwise + """ + record = ( + self.filter( + sub_theme__theme__name=theme_name, + sub_theme__name=sub_theme_name, + name=topic_name, + ).first() + ) + + if record: + return int(record.sub_theme.theme_id), int(record.sub_theme_id), int(record.id) + + return -2, -2, -2 + def get_all_unique_names(self) -> models.QuerySet: """Gets all available unique topic names as a flat list queryset. @@ -113,6 +141,28 @@ def get_name_by_id(self, topic_id: int) -> str | None: """ return self.get_queryset().get_name_by_id(topic_id) + def get_id_by_name( + self, theme_name: str, sub_theme_name: str, topic_name: str + ) -> tuple[int, int, int]: + """Gets the theme, sub-theme and topic IDs matching the given names. + + Args: + theme_name: The name of the parent theme + sub_theme_name: The name of the parent sub-theme + topic_name: The name of the topic to look up + + Returns: + A tuple of (theme_id, sub_theme_id, topic_id) if found, + or (-2, -2, -2) if not found. + + Examples: + >>> TopicManager.get_id_by_name("Infectious disease", "Respiratory", "COVID-19") + (1, 2, 3) + >>> TopicManager.get_id_by_name("Unknown", "Unknown", "Unknown") + (-2, -2, -2) + """ + return self.get_queryset().get_id_by_name(theme_name, sub_theme_name, topic_name) + def get_all_names(self) -> TopicQuerySet: """Gets all available topic names as a flat list queryset. diff --git a/metrics/domain/models/charts/common.py b/metrics/domain/models/charts/common.py index faaa30a0c..0ba1d3579 100644 --- a/metrics/domain/models/charts/common.py +++ b/metrics/domain/models/charts/common.py @@ -26,12 +26,16 @@ class BaseChartRequestParams(BaseModel): class Config: arbitrary_types_allowed = True - @property - def rbac_permissions(self) -> Iterable["RBACPermission"]: - return getattr(self.request, "rbac_permissions", []) - @property def permission_sets(self) -> dict: + """Extract JWT permissions from the authenticated request""" + logger.info(f'Entered BaseChartRequestParams.permission_sets') return getattr(self.request.user, "permission_sets", {}) + + @property + def rbac_permissions(self) -> Iterable["RBACPermission"]: + """TODO: RBAC-based permissions are legacy and will be removed in a future release""" + + return getattr(self.request, "rbac_permissions", []) diff --git a/metrics/interfaces/plots/access.py b/metrics/interfaces/plots/access.py index efad04938..28e8a5eb0 100644 --- a/metrics/interfaces/plots/access.py +++ b/metrics/interfaces/plots/access.py @@ -165,8 +165,8 @@ def get_queryset_from_core_model_manager( return self.core_model_manager.query_for_data( **plot_params, - rbac_permissions=self.chart_request_params.rbac_permissions, - permission_sets = self.chart_request_params.permission_sets, + rbac_permissions=self.chart_request_params.rbac_permissions, # TODO: Remove old permissions + permission_sets = self.chart_request_params.permission_sets, # new permissions ) def build_plot_data_from_parameters_with_complete_queryset( From bf2dac15cc27dfab8bfc0d9c863cd5af76832f1a Mon Sep 17 00:00:00 2001 From: Dan Dammann Date: Fri, 29 May 2026 10:27:18 +0100 Subject: [PATCH 3/6] CDD-3173: let cms/dashboard/viewsets.py from CDD-3171 use my fully equivalent check_permissions() function instead --- cms/dashboard/viewsets.py | 30 +----------------------------- 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/cms/dashboard/viewsets.py b/cms/dashboard/viewsets.py index 89c41446f..89b2c1ddc 100644 --- a/cms/dashboard/viewsets.py +++ b/cms/dashboard/viewsets.py @@ -9,40 +9,12 @@ from wagtail.api.v2.views import PagesAPIViewSet from caching.private_api.decorators import cache_response -from cms.auth_content.constants import WILDCARD_ID_VALUE +from cms.auth_content.auth_utils import check_permissions from cms.dashboard.serializers import CMSDraftPagesSerializer, ListablePageSerializer from cms.metrics_documentation.models.child import MetricsDocumentationChildEntry from cms.topic.models import TopicPage -def check_permissions(user_permissions, theme_id, sub_theme_id, topic_id) -> bool: - if not isinstance(user_permissions, list): - return False - - for permission in user_permissions: - permission_theme_id = permission.get("theme", {}).get("id") - permission_sub_theme_id = permission.get("sub_theme", {}).get("id") - permission_topic_id = permission.get("topic", {}).get("id") - - if permission_theme_id == WILDCARD_ID_VALUE: - return True - - if ( - permission_theme_id == theme_id - and permission_sub_theme_id == WILDCARD_ID_VALUE - ): - return True - - if ( - permission_theme_id == theme_id - and permission_sub_theme_id == sub_theme_id - and (permission_topic_id in {WILDCARD_ID_VALUE, topic_id}) - ): - return True - - return False - - @extend_schema(tags=["cms"]) class CMSPagesAPIViewSet(PagesAPIViewSet): # This is the /pages (or proxy/pages env dependent endpoint) From f6dc30038066cf922a259f2f135b4fca6f8eb5f0 Mon Sep 17 00:00:00 2001 From: Dan Dammann Date: Fri, 29 May 2026 10:28:30 +0100 Subject: [PATCH 4/6] CDD-3173: add debugging code to user_manager.py to be able to test this JIRA ticket in isolation --- common/auth/cognito_jwt/user_manager.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/common/auth/cognito_jwt/user_manager.py b/common/auth/cognito_jwt/user_manager.py index 3a69470a4..0c4d2550d 100644 --- a/common/auth/cognito_jwt/user_manager.py +++ b/common/auth/cognito_jwt/user_manager.py @@ -17,6 +17,23 @@ def get_or_create_for_cognito(jwt_payload): try: username = jwt_payload["entraObjectId"] permission_sets = jwt_payload["permissionSets"] + + # DEBUGGING: Manual testing (just for now) + # username = "{YOUR_ENTRA_OBJECT_ID}" + # permission_sets = { + # "permission_sets": [ + # { + # "theme": {"id": "100", "name": "immunisation"}, + # "sub_theme": {"id": "133", "name": "childhood-vaccines"}, + # "topic": {"id": "-1", "name": "* (All)"}, + # "metric": {"id": "-1", "name": "* (All)"}, + # "geography_type": {"id": "-1", "name": "* (All)"}, + # "geography": {"id": "-1", "name": "* (All)"}, + # } + # ], + # "summary": {"has_global_access": False}, + # } + if not permission_sets: logger.debug( "Empty permissionSets in token for user: '%s'", From 6d65dde1e0033ef69ac0cdf844bd69cd4398a18d Mon Sep 17 00:00:00 2001 From: Dan Dammann Date: Fri, 29 May 2026 16:17:34 +0100 Subject: [PATCH 5/6] CDD-3173: evaluate metric- and geography-related permissions separately --- cms/auth_content/auth_utils.py | 258 +++++++++++++++++------- common/auth/cognito_jwt/user_manager.py | 6 +- 2 files changed, 183 insertions(+), 81 deletions(-) diff --git a/cms/auth_content/auth_utils.py b/cms/auth_content/auth_utils.py index c724e78a3..98e5a3fd4 100644 --- a/cms/auth_content/auth_utils.py +++ b/cms/auth_content/auth_utils.py @@ -8,81 +8,6 @@ logger = logging.getLogger(__name__) -def check_permissions( - permission_sets, - theme_id, - sub_theme_id, - topic_id, - metric_id=0, - geography_type=0, - geography_id=0, -) -> bool: - if not isinstance(permission_sets, list): - return False - - # TODO: Can we make them all ID at source? - theme_id = str(theme_id) - sub_theme_id = str(sub_theme_id) - topic_id = str(topic_id) - metric_id = str(metric_id) - geography_type = str(geography_type) - geography_id = str(geography_id) - - for permission in permission_sets: - permission_theme_id = permission.get("theme", {}).get("id") - permission_sub_theme_id = permission.get("sub_theme", {}).get("id") - permission_topic_id = permission.get("topic", {}).get("id") - permission_metric_id = permission.get("metric", {}).get("id") - permission_geography_type = permission.get("geography_type", {}).get("id") - permission_geography_id = permission.get("geography", {}).get("id") - - if permission_theme_id == WILDCARD_ID_VALUE: - return True - - if ( - permission_theme_id == theme_id - and permission_sub_theme_id == WILDCARD_ID_VALUE - ): - return True - - if ( - permission_theme_id == theme_id - and permission_sub_theme_id == sub_theme_id - and permission_topic_id in {WILDCARD_ID_VALUE, topic_id} - ): - return True - - if ( - permission_theme_id == theme_id - and permission_sub_theme_id == sub_theme_id - and permission_topic_id == topic_id - and permission_metric_id in {WILDCARD_ID_VALUE, metric_id} - ): - return True - - if ( - permission_theme_id == theme_id - and permission_sub_theme_id == sub_theme_id - and permission_topic_id == topic_id - and permission_metric_id == metric_id - and permission_geography_type in {WILDCARD_ID_VALUE, geography_type} - ): - return True - - if ( - permission_theme_id == theme_id - and permission_sub_theme_id == sub_theme_id - and permission_topic_id == topic_id - and permission_metric_id == metric_id - and permission_geography_type == geography_type - and permission_geography_id in {WILDCARD_ID_VALUE, geography_id} - ): - return True - - return False - - - def check_permissions_by_name( permission_sets, theme_name, @@ -92,6 +17,11 @@ def check_permissions_by_name( geography_type, geography_name, ) -> bool: + """ + This is a wrapper that converts permission resource names + into ids. It is only used to check CHART permissions. + """ + logger.info(f'Entered check_permissions_by_name()') theme_id, sub_theme_id, topic_id = Topic.objects.get_id_by_name(theme_name, sub_theme_name, topic_name) @@ -99,10 +29,10 @@ def check_permissions_by_name( geography_type_id = GeographyType.objects.get_id_by_name(geography_type) geography_id = Geography.objects.get_id_by_name(geography_name) - # In case a NAME doesn't have an ID + # Be safe, just in case a NAME doesn't have an ID if any( - value == -2 - for value in (theme_id, sub_theme_id, topic_id, metric_id, geography_type_id, geography_id) + value == -2 + for value in (theme_id, sub_theme_id, topic_id, metric_id, geography_type_id, geography_id) ): return False @@ -126,6 +56,27 @@ def check_permission_set( geography_type, geography_id, ) -> bool: + """ + This is a wrapper that only checks for global permissions, and + delegates further checks to our core permission checking function. + It is only used to check CHART permissions. + + @param {dict} permission_sets which contains a permission_sets list, eg: + { + "permission_sets": [ + { + "theme": {"id": "100", "name": "immunisation"}, + "sub_theme": {"id": "200", "name": "childhood-vaccines"}, + "topic": {"id": "-1", "name": "* (All)"}, + "metric": {"id": "-1", "name": "* (All)"}, + "geography_type": {"id": "300", "name": "Nation"}, + "geography": {"id": "-1", "name": "* (All)"}, + } + ], + "summary": {"has_global_access": False}, + } + """ + logger.info(f'Entered check_permission_set()') if not isinstance(permission_sets, dict): @@ -150,6 +101,157 @@ def check_permission_set( geography_id, ) +def check_permissions( + permission_sets, + theme_id, + sub_theme_id, + topic_id, + metric_id=0, + geography_type=0, + geography_id=0, +) -> bool: + """ + This is our core permission-checking function It is + used to check both PAGE & CHART permissions. + + Metric- and geography-related permissions must be + evaluated separately (spec says). + + @param {list} permission_sets, eg: + [ + { + "theme": {"id": "100", "name": "immunisation"}, + "sub_theme": {"id": "200", "name": "childhood-vaccines"}, + "topic": {"id": "-1", "name": "* (All)"}, + "metric": {"id": "-1", "name": "* (All)"}, + "geography_type": {"id": "300", "name": "Nation"}, + "geography": {"id": "-1", "name": "* (All)"}, + } + ] + """ + + logger.info(f'Entered check_permissions()') + + if not isinstance(permission_sets, list): + return False + + for permission_set in permission_sets: + if ( + check_metric_permissions(permission_set, theme_id, sub_theme_id, topic_id, metric_id) and + check_geography_permissions(permission_set, geography_type, geography_id) + ): + return True + + return False + + +def check_metric_permissions( + permission_set, + theme_id, + sub_theme_id, + topic_id, + metric_id=0, +) -> bool: + """ + Make sure that every theme, sub_theme, topic and metric + match or have a wildcard at the end (only look at the + first 4 attributes of permission_set). + + @param {dict} permission_set, eg: + { + "theme": {"id": "100", "name": "immunisation"}, + "sub_theme": {"id": "200", "name": "childhood-vaccines"}, + "topic": {"id": "-1", "name": "* (All)"}, + "metric": {"id": "-1", "name": "* (All)"}, + "geography_type": {"id": "300", "name": "Nation"}, + "geography": {"id": "-1", "name": "* (All)"}, + } + """ + + logger.info(f'Entered check_metric_permissions()') + + if not isinstance(permission_set, dict): + return False + + theme_id = str(theme_id) + sub_theme_id = str(sub_theme_id) + topic_id = str(topic_id) + metric_id = str(metric_id) + + permission_theme_id = str(permission_set.get("theme", {}).get("id")) + permission_sub_theme_id = str(permission_set.get("sub_theme", {}).get("id")) + permission_topic_id = str(permission_set.get("topic", {}).get("id")) + permission_metric_id = str(permission_set.get("metric", {}).get("id")) + + if permission_theme_id == WILDCARD_ID_VALUE: + return True + + if ( + permission_theme_id == theme_id + and permission_sub_theme_id == WILDCARD_ID_VALUE + ): + return True + + if ( + permission_theme_id == theme_id + and permission_sub_theme_id == sub_theme_id + and permission_topic_id in {WILDCARD_ID_VALUE, topic_id} + ): + return True + + if ( + permission_theme_id == theme_id + and permission_sub_theme_id == sub_theme_id + and permission_topic_id == topic_id + and permission_metric_id in {WILDCARD_ID_VALUE, metric_id} + ): + return True + + return False + + +def check_geography_permissions( + permission_set, + geography_type, + geography_id, +) -> bool: + """ + Make sure that both geography_type and geography + match or have a wildcard at the end (only look at the + first 2 attributes of permission_set). + + @param {dict} permission_set, eg: + { + "theme": {"id": "100", "name": "immunisation"}, + "sub_theme": {"id": "200", "name": "childhood-vaccines"}, + "topic": {"id": "-1", "name": "* (All)"}, + "metric": {"id": "-1", "name": "* (All)"}, + "geography_type": {"id": "300", "name": "Nation"}, + "geography": {"id": "-1", "name": "* (All)"}, + } + """ + + logger.info(f'Entered check_geography_permissions()') + + if not isinstance(permission_set, dict): + return False + + geography_type = str(geography_type) + geography_id = str(geography_id) + + permission_geography_type = str(permission_set.get("geography_type", {}).get("id")) + permission_geography_id = str(permission_set.get("geography", {}).get("id")) + + if permission_geography_type == WILDCARD_ID_VALUE: + return True + + if ( + permission_geography_type == geography_type + and permission_geography_id in {WILDCARD_ID_VALUE, geography_id} + ): + return True + + return False def _create_form_field( field: dict[str, str | Callable | None], wildcard_id_value=None diff --git a/common/auth/cognito_jwt/user_manager.py b/common/auth/cognito_jwt/user_manager.py index 0c4d2550d..a101168e0 100644 --- a/common/auth/cognito_jwt/user_manager.py +++ b/common/auth/cognito_jwt/user_manager.py @@ -24,11 +24,11 @@ def get_or_create_for_cognito(jwt_payload): # "permission_sets": [ # { # "theme": {"id": "100", "name": "immunisation"}, - # "sub_theme": {"id": "133", "name": "childhood-vaccines"}, + # "sub_theme": {"id": "200", "name": "childhood-vaccines"}, # "topic": {"id": "-1", "name": "* (All)"}, # "metric": {"id": "-1", "name": "* (All)"}, - # "geography_type": {"id": "-1", "name": "* (All)"}, - # "geography": {"id": "-1", "name": "* (All)"}, + # "geography_type": {"id": "300", "name": "Nation"}, + # "geography": {"id": "400", "name": "England"}, # } # ], # "summary": {"has_global_access": False}, From 2d3677e180980485a83cd7a5072d30a8382b8153 Mon Sep 17 00:00:00 2001 From: Dan Dammann Date: Fri, 29 May 2026 17:14:55 +0100 Subject: [PATCH 6/6] CDD-3173: lint --- cms/auth_content/auth_utils.py | 102 +++++++++++------- .../data/managers/core_models/time_series.py | 22 ++-- metrics/data/managers/core_models/topic.py | 28 +++-- metrics/domain/models/charts/common.py | 3 +- metrics/interfaces/plots/access.py | 6 +- 5 files changed, 96 insertions(+), 65 deletions(-) diff --git a/cms/auth_content/auth_utils.py b/cms/auth_content/auth_utils.py index 98e5a3fd4..cb54ffd23 100644 --- a/cms/auth_content/auth_utils.py +++ b/cms/auth_content/auth_utils.py @@ -1,13 +1,20 @@ import logging from collections.abc import Callable -from cms.auth_content.constants import WILDCARD_ID_VALUE from django import forms + +from cms.auth_content.constants import WILDCARD_ID_VALUE from cms.dynamic_content import help_texts -from metrics.data.models.core_models.supporting import Geography, GeographyType, Metric, Topic +from metrics.data.models.core_models.supporting import ( + Geography, + GeographyType, + Metric, + Topic, +) logger = logging.getLogger(__name__) + def check_permissions_by_name( permission_sets, theme_name, @@ -22,9 +29,11 @@ def check_permissions_by_name( into ids. It is only used to check CHART permissions. """ - logger.info(f'Entered check_permissions_by_name()') + logger.info("Entered check_permissions_by_name()") - theme_id, sub_theme_id, topic_id = Topic.objects.get_id_by_name(theme_name, sub_theme_name, topic_name) + theme_id, sub_theme_id, topic_id = Topic.objects.get_id_by_name( + theme_name, sub_theme_name, topic_name + ) metric_id = Metric.objects.get_id_by_name(metric_name) geography_type_id = GeographyType.objects.get_id_by_name(geography_type) geography_id = Geography.objects.get_id_by_name(geography_name) @@ -32,7 +41,14 @@ def check_permissions_by_name( # Be safe, just in case a NAME doesn't have an ID if any( value == -2 - for value in (theme_id, sub_theme_id, topic_id, metric_id, geography_type_id, geography_id) + for value in ( + theme_id, + sub_theme_id, + topic_id, + metric_id, + geography_type_id, + geography_id, + ) ): return False @@ -77,7 +93,7 @@ def check_permission_set( } """ - logger.info(f'Entered check_permission_set()') + logger.info("Entered check_permission_set()") if not isinstance(permission_sets, dict): return False @@ -90,25 +106,26 @@ def check_permission_set( if permission_sets.get("summary").get("has_global_access"): return True - else: - return check_permissions( - permission_sets.get("permission_sets"), - theme_id, - sub_theme_id, - topic_id, - metric_id, - geography_type, - geography_id, - ) + + return check_permissions( + permission_sets.get("permission_sets"), + theme_id, + sub_theme_id, + topic_id, + metric_id, + geography_type, + geography_id, + ) + def check_permissions( permission_sets, theme_id, sub_theme_id, topic_id, - metric_id=0, - geography_type=0, - geography_id=0, + metric_id=None, + geography_type=None, + geography_id=None, ) -> bool: """ This is our core permission-checking function It is @@ -130,27 +147,36 @@ def check_permissions( ] """ - logger.info(f'Entered check_permissions()') + logger.info("Entered check_permissions()") if not isinstance(permission_sets, list): return False for permission_set in permission_sets: - if ( - check_metric_permissions(permission_set, theme_id, sub_theme_id, topic_id, metric_id) and - check_geography_permissions(permission_set, geography_type, geography_id) - ): - return True + if geography_type and geography_id: + # CHART permissions + if check_metric_related_permissions( + permission_set, theme_id, sub_theme_id, topic_id, metric_id + ) and check_geography_permissions( + permission_set, geography_type, geography_id + ): + return True + else: + # PAGE permissions + if check_metric_related_permissions( + permission_set, theme_id, sub_theme_id, topic_id, metric_id + ): + return True return False -def check_metric_permissions( +def check_metric_related_permissions( permission_set, theme_id, sub_theme_id, topic_id, - metric_id=0, + metric_id=None, ) -> bool: """ Make sure that every theme, sub_theme, topic and metric @@ -168,7 +194,7 @@ def check_metric_permissions( } """ - logger.info(f'Entered check_metric_permissions()') + logger.info("Entered check_metric_related_permissions()") if not isinstance(permission_set, dict): return False @@ -186,10 +212,7 @@ def check_metric_permissions( if permission_theme_id == WILDCARD_ID_VALUE: return True - if ( - permission_theme_id == theme_id - and permission_sub_theme_id == WILDCARD_ID_VALUE - ): + if permission_theme_id == theme_id and permission_sub_theme_id == WILDCARD_ID_VALUE: return True if ( @@ -212,8 +235,8 @@ def check_metric_permissions( def check_geography_permissions( permission_set, - geography_type, - geography_id, + geography_type=None, + geography_id=None, ) -> bool: """ Make sure that both geography_type and geography @@ -231,7 +254,7 @@ def check_geography_permissions( } """ - logger.info(f'Entered check_geography_permissions()') + logger.info("Entered check_geography_permissions()") if not isinstance(permission_set, dict): return False @@ -245,14 +268,15 @@ def check_geography_permissions( if permission_geography_type == WILDCARD_ID_VALUE: return True - if ( - permission_geography_type == geography_type - and permission_geography_id in {WILDCARD_ID_VALUE, geography_id} - ): + if permission_geography_type == geography_type and permission_geography_id in { + WILDCARD_ID_VALUE, + geography_id, + }: return True return False + def _create_form_field( field: dict[str, str | Callable | None], wildcard_id_value=None ) -> forms.CharField: diff --git a/metrics/data/managers/core_models/time_series.py b/metrics/data/managers/core_models/time_series.py index 8612e6462..5b63aaca2 100644 --- a/metrics/data/managers/core_models/time_series.py +++ b/metrics/data/managers/core_models/time_series.py @@ -9,13 +9,13 @@ import logging from collections.abc import Iterable from typing import Self + from django.db import models from django.db.models.query_utils import Q from django.utils import timezone from metrics.api.permissions.fluent_permissions import ( is_public_data_only_enforced, - validate_permissions_for_non_public, ) from metrics.data.models import RBACPermission @@ -23,6 +23,7 @@ logger = logging.getLogger(__name__) + class CoreTimeSeriesQuerySet(models.QuerySet): """Custom queryset which can be used by the `CoreTimeSeriesManager`""" @@ -238,7 +239,8 @@ def query_for_data( ]>` """ - logger.info('Entered query_for_data()') + + logger.info("Entered query_for_data()") queryset = self.filter( metric__topic__name=topic, @@ -254,14 +256,12 @@ def query_for_data( sex=sex, age=age, ) - public_queryset = queryset.filter( - is_public=True - ) + public_queryset = queryset.filter(is_public=True) if permission_sets: - logger.info('Entered if permission_sets clause') + logger.info("Entered if permission_sets clause") - # TODO: Workaround cos circular import error when at the top of the file + # WORKAROUND: Cos circular import error when at the top of the file from cms.auth_content.auth_utils import check_permissions_by_name if check_permissions_by_name( @@ -273,13 +273,11 @@ def query_for_data( geography_type, geography, ): - logger.info('Entered check_permissions_by_name() if clause') + logger.info("Entered check_permissions_by_name() if clause") - queryset = public_queryset + queryset.filter( - is_public=False - ) + queryset = public_queryset + queryset.filter(is_public=False) else: - logger.info('Entered else permission_sets clause') + logger.info("Entered else permission_sets clause") queryset = public_queryset diff --git a/metrics/data/managers/core_models/topic.py b/metrics/data/managers/core_models/topic.py index fb654334f..cd67e9f1a 100644 --- a/metrics/data/managers/core_models/topic.py +++ b/metrics/data/managers/core_models/topic.py @@ -55,18 +55,24 @@ def get_id_by_name( A tuple of (theme_id, sub_theme_id, topic_id) if found, or the tuple (-2, -2, -2) otherwise """ - record = ( - self.filter( - sub_theme__theme__name=theme_name, - sub_theme__name=sub_theme_name, - name=topic_name, - ).first() - ) + record = self.filter( + sub_theme__theme__name=theme_name, + sub_theme__name=sub_theme_name, + name=topic_name, + ).first() if record: - return int(record.sub_theme.theme_id), int(record.sub_theme_id), int(record.id) + return ( + int(record.sub_theme.theme_id), + int(record.sub_theme_id), + int(record.id), + ) - return -2, -2, -2 + return ( + -2, + -2, + -2, + ) def get_all_unique_names(self) -> models.QuerySet: """Gets all available unique topic names as a flat list queryset. @@ -161,7 +167,9 @@ def get_id_by_name( >>> TopicManager.get_id_by_name("Unknown", "Unknown", "Unknown") (-2, -2, -2) """ - return self.get_queryset().get_id_by_name(theme_name, sub_theme_name, topic_name) + return self.get_queryset().get_id_by_name( + theme_name, sub_theme_name, topic_name + ) def get_all_names(self) -> TopicQuerySet: """Gets all available topic names as a flat list queryset. diff --git a/metrics/domain/models/charts/common.py b/metrics/domain/models/charts/common.py index 0ba1d3579..7608749d3 100644 --- a/metrics/domain/models/charts/common.py +++ b/metrics/domain/models/charts/common.py @@ -8,6 +8,7 @@ logger = logging.getLogger(__name__) + class BaseChartRequestParams(BaseModel): file_format: Literal["png", "svg", "jpg", "jpeg", "json", "csv"] chart_width: int @@ -30,7 +31,7 @@ class Config: def permission_sets(self) -> dict: """Extract JWT permissions from the authenticated request""" - logger.info(f'Entered BaseChartRequestParams.permission_sets') + logger.info("Entered BaseChartRequestParams.permission_sets") return getattr(self.request.user, "permission_sets", {}) diff --git a/metrics/interfaces/plots/access.py b/metrics/interfaces/plots/access.py index 28e8a5eb0..e8bcb0406 100644 --- a/metrics/interfaces/plots/access.py +++ b/metrics/interfaces/plots/access.py @@ -161,12 +161,12 @@ def get_queryset_from_core_model_manager( plot_params["fields_to_export"].append("upper_confidence") plot_params["fields_to_export"].append("lower_confidence") - logger.info('Entered access.py') + logger.info("Entered access.py") return self.core_model_manager.query_for_data( **plot_params, - rbac_permissions=self.chart_request_params.rbac_permissions, # TODO: Remove old permissions - permission_sets = self.chart_request_params.permission_sets, # new permissions + rbac_permissions=self.chart_request_params.rbac_permissions, # old permissions (remove) + permission_sets=self.chart_request_params.permission_sets, # new permissions ) def build_plot_data_from_parameters_with_complete_queryset(