From 9515bbf286efc07e9605c37517afd5652235fff5 Mon Sep 17 00:00:00 2001 From: Nimish Date: Tue, 7 Apr 2026 21:40:05 +0800 Subject: [PATCH 1/6] fix: add input validation utilities and harden token parsing - Add validate_text_field(): rejects non-string types, strips HTML tags (XSS defence-in-depth via django.utils.html.strip_tags), enforces max length - Add validate_email_address(): uses Django's built-in EmailValidator - Fix get_token_type(): handle empty/malformed bearer tokens that caused 500 IndexError --- backend/api/utils/rest.py | 52 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/backend/api/utils/rest.py b/backend/api/utils/rest.py index a90914657..15c3db25d 100644 --- a/backend/api/utils/rest.py +++ b/backend/api/utils/rest.py @@ -1,5 +1,8 @@ from api.models import EnvironmentToken, ServiceAccountToken, ServiceToken, UserToken from django.utils import timezone +from django.utils.html import strip_tags +from django.core.validators import validate_email +from django.core.exceptions import ValidationError as DjangoValidationError import base64 from api.utils.access.ip import get_client_ip @@ -20,7 +23,10 @@ def get_resolver_request_meta(request): def get_token_type(auth_token): - return auth_token.split(" ")[1] + parts = auth_token.split(" ") + if len(parts) < 2 or not parts[1]: + return None + return parts[1] def get_env_from_service_token(auth_token): @@ -101,6 +107,50 @@ def token_is_expired_or_deleted(auth_token): ) +def validate_text_field(value, field_name, max_length=None, required=True): + """Validate and sanitize a text field from request data. + + Returns (cleaned_value, error_message). + If error_message is not None, the caller should return a 400 response. + """ + if value is None or (isinstance(value, str) and not value.strip()): + if required: + return None, f"Missing required field: {field_name}" + return None, None + + if not isinstance(value, str): + return None, f"'{field_name}' must be a string." + + cleaned = strip_tags(value).strip() + if required and not cleaned: + return None, f"Missing required field: {field_name}" + + if max_length and len(cleaned) > max_length: + return None, f"'{field_name}' cannot exceed {max_length} characters." + + return cleaned, None + + +def validate_email_address(email): + """Validate an email address using Django's built-in validator. + + Returns (cleaned_email, error_message). + """ + if not email or not isinstance(email, str): + return None, "Missing required field: email" + + cleaned = email.strip().lower() + if not cleaned: + return None, "Missing required field: email" + + try: + validate_email(cleaned) + except DjangoValidationError: + return None, f"'{cleaned}' is not a valid email address." + + return cleaned, None + + def encode_string_to_base64(s): # Convert string to bytes byte_representation = s.encode("utf-8") From c110d9c8e0aeced5918efc8e1298247600cd95ae Mon Sep 17 00:00:00 2001 From: Nimish Date: Tue, 7 Apr 2026 21:40:14 +0800 Subject: [PATCH 2/6] fix: prevent 500 on cross-org app/environment access user_can_access_app() and user_can_access_environment() did bare .get() calls that threw unhandled DoesNotExist when a user from org B accessed a resource belonging to org A. Wrap in try/except and return False so the auth layer returns 403 instead of crashing. Same fix applied to service_account_can_access_environment(). --- backend/api/utils/access/permissions.py | 33 ++++++++++++++++--------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/backend/api/utils/access/permissions.py b/backend/api/utils/access/permissions.py index 4a08982a5..7fd1b654d 100644 --- a/backend/api/utils/access/permissions.py +++ b/backend/api/utils/access/permissions.py @@ -24,10 +24,13 @@ def user_can_access_app(user_id, app_id): OrganisationMember = apps.get_model("api", "OrganisationMember") App = apps.get_model("api", "App") - app = App.objects.get(id=app_id) - org_member = OrganisationMember.objects.get( - user_id=user_id, organisation=app.organisation, deleted_at=None - ) + try: + app = App.objects.get(id=app_id) + org_member = OrganisationMember.objects.get( + user_id=user_id, organisation=app.organisation, deleted_at=None + ) + except (App.DoesNotExist, OrganisationMember.DoesNotExist): + return False return org_member in app.members.all() @@ -36,10 +39,13 @@ def user_can_access_environment(user_id, env_id): Environment = apps.get_model("api", "Environment") EnvironmentKey = apps.get_model("api", "EnvironmentKey") - env = Environment.objects.get(id=env_id) - org_member = OrganisationMember.objects.get( - organisation=env.app.organisation, user_id=user_id, deleted_at=None - ) + try: + env = Environment.objects.get(id=env_id) + org_member = OrganisationMember.objects.get( + organisation=env.app.organisation, user_id=user_id, deleted_at=None + ) + except (Environment.DoesNotExist, OrganisationMember.DoesNotExist): + return False return EnvironmentKey.objects.filter( user_id=org_member, environment_id=env_id ).exists() @@ -50,10 +56,13 @@ def service_account_can_access_environment(account_id, env_id): EnvironmentKey = apps.get_model("api", "EnvironmentKey") ServiceAccount = apps.get_model("api", "ServiceAccount") - env = Environment.objects.get(id=env_id) - service_account = ServiceAccount.objects.get( - organisation=env.app.organisation, id=account_id, deleted_at=None - ) + try: + env = Environment.objects.get(id=env_id) + service_account = ServiceAccount.objects.get( + organisation=env.app.organisation, id=account_id, deleted_at=None + ) + except (Environment.DoesNotExist, ServiceAccount.DoesNotExist): + return False return EnvironmentKey.objects.filter( service_account=service_account, environment_id=env_id ).exists() From d67e10892e009ce40045bda657d2562ffa2d2b28 Mon Sep 17 00:00:00 2001 From: Nimish Date: Tue, 7 Apr 2026 21:40:22 +0800 Subject: [PATCH 3/6] fix: protect Owner from API modification and harden SA member management - Owner member's role cannot be changed via the API (all actors blocked, not just self-update). Directs to the ownership transfer flow. - Owner member cannot be removed via the API (all actors blocked). - SAs cannot modify or remove members with global-access roles (e.g. Admin). Previously the global-access check only applied to User tokens. - Member invite email validated with Django's EmailValidator (previously accepted arbitrary strings like "not-an-email"). - Unsupported HTTP methods now return 405 instead of 403. --- backend/api/views/members.py | 51 +++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/backend/api/views/members.py b/backend/api/views/members.py index 34026b542..f2803c9d6 100644 --- a/backend/api/views/members.py +++ b/backend/api/views/members.py @@ -8,7 +8,7 @@ from rest_framework.views import APIView from rest_framework.permissions import IsAuthenticated -from rest_framework.exceptions import PermissionDenied +from rest_framework.exceptions import MethodNotAllowed, PermissionDenied from rest_framework.response import Response from rest_framework import status from djangorestframework_camel_case.render import CamelCaseJSONRenderer @@ -32,7 +32,7 @@ from api.utils.audit_logging import log_audit_event, get_actor_info, get_member_display_name from api.utils.crypto import decrypt_asymmetric, get_server_keypair from api.utils.environments import _ed25519_pk_to_curve25519, _wrap_env_secrets_for_key -from api.utils.rest import METHOD_TO_ACTION, get_resolver_request_meta +from api.utils.rest import METHOD_TO_ACTION, get_resolver_request_meta, validate_email_address from api.throttling import PlanBasedRateThrottle from api.utils.access.middleware import IsIPAllowed from backend.quotas import can_add_account @@ -93,7 +93,7 @@ def initial(self, request, *args, **kwargs): super().initial(request, *args, **kwargs) action = METHOD_TO_ACTION.get(request.method) if not action: - raise PermissionDenied(f"Unsupported HTTP method: {request.method}") + raise MethodNotAllowed(request.method) _check_permission(request, action) def get(self, request, *args, **kwargs): @@ -109,14 +109,13 @@ def post(self, request, *args, **kwargs): org = _get_org(request) invited_by = request.auth.get("org_member") # None for SA tokens - email = request.data.get("email", "").strip().lower() + raw_email = request.data.get("email", "") role_id = request.data.get("role_id") - if not email: - return Response( - {"error": "Missing required field: email"}, - status=status.HTTP_400_BAD_REQUEST, - ) + email, err = validate_email_address(raw_email) + if err: + return Response({"error": err}, status=status.HTTP_400_BAD_REQUEST) + if not role_id: return Response( {"error": "Missing required field: role_id"}, @@ -237,7 +236,7 @@ def initial(self, request, *args, **kwargs): super().initial(request, *args, **kwargs) action = METHOD_TO_ACTION.get(request.method) if not action: - raise PermissionDenied(f"Unsupported HTTP method: {request.method}") + raise MethodNotAllowed(request.method) _check_permission(request, action) def _get_member(self, member_id, org): @@ -261,8 +260,15 @@ def put(self, request, member_id, *args, **kwargs): if not member: return Response({"error": "Member not found."}, status=status.HTTP_404_NOT_FOUND) - # Prevent self role-update + # The Owner role is immutable via the API — use the ownership transfer flow + if member.role.is_default and member.role.name.lower() == "owner": + return Response( + {"error": "The Owner's role cannot be changed via the API. Use the ownership transfer flow."}, + status=status.HTTP_403_FORBIDDEN, + ) + if request.auth["auth_type"] == "User": + # Prevent self role-update acting_member = request.auth["org_member"] if str(acting_member.id) == str(member.id): return Response( @@ -277,6 +283,13 @@ def put(self, request, member_id, *args, **kwargs): {"error": "You cannot update the role of a member with global access."}, status=status.HTTP_403_FORBIDDEN, ) + elif request.auth["auth_type"] == "ServiceAccount": + # SAs cannot modify members with global-access roles (e.g. Admin) + if role_has_global_access(member.role): + return Response( + {"error": "Service accounts cannot modify members with global access."}, + status=status.HTTP_403_FORBIDDEN, + ) role_id = request.data.get("role_id") if not role_id: @@ -339,13 +352,27 @@ def delete(self, request, member_id, *args, **kwargs): if not member: return Response({"error": "Member not found."}, status=status.HTTP_404_NOT_FOUND) - # Prevent self-removal + # The Owner cannot be removed via the API + if member.role.is_default and member.role.name.lower() == "owner": + return Response( + {"error": "The Owner cannot be removed via the API. Use the ownership transfer flow."}, + status=status.HTTP_403_FORBIDDEN, + ) + if request.auth["auth_type"] == "User": + # Prevent self-removal if str(request.auth["org_member"].id) == str(member.id): return Response( {"error": "You cannot remove yourself from the organisation."}, status=status.HTTP_403_FORBIDDEN, ) + elif request.auth["auth_type"] == "ServiceAccount": + # SAs cannot remove members with global-access roles (e.g. Admin) + if role_has_global_access(member.role): + return Response( + {"error": "Service accounts cannot remove members with global access."}, + status=status.HTTP_403_FORBIDDEN, + ) member_display_name = get_member_display_name(member) member_email = member.user.email From efaaeec94f65cee9592ec95a1714818751c97701 Mon Sep 17 00:00:00 2001 From: Nimish Date: Tue, 7 Apr 2026 21:40:30 +0800 Subject: [PATCH 4/6] fix: block role escalation via update and accept camelCase permissions - Prevent enabling global_access on a role that has service accounts assigned. Previously an SA's role could be updated to global_access after the SA was created, bypassing the creation-time check. - Add _normalize_permissions() to accept both camelCase (appPermissions, globalAccess) and snake_case (app_permissions, global_access) keys, enabling GET response round-tripping back to POST/PUT. - Apply validate_text_field() to name and description fields. - Unsupported HTTP methods now return 405 instead of 403. --- backend/api/views/roles.py | 86 +++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 44 deletions(-) diff --git a/backend/api/views/roles.py b/backend/api/views/roles.py index 593765ad3..8e2c0e403 100644 --- a/backend/api/views/roles.py +++ b/backend/api/views/roles.py @@ -7,13 +7,13 @@ from api.utils.access.permissions import user_has_permission from api.utils.access.roles import default_roles from api.utils.audit_logging import log_audit_event, get_actor_info, build_change_values -from api.utils.rest import METHOD_TO_ACTION, get_resolver_request_meta +from api.utils.rest import METHOD_TO_ACTION, get_resolver_request_meta, validate_text_field from api.throttling import PlanBasedRateThrottle from api.utils.access.middleware import IsIPAllowed from rest_framework.views import APIView from rest_framework.permissions import IsAuthenticated -from rest_framework.exceptions import PermissionDenied +from rest_framework.exceptions import MethodNotAllowed, PermissionDenied from rest_framework.response import Response from rest_framework import status from djangorestframework_camel_case.render import CamelCaseJSONRenderer @@ -30,6 +30,12 @@ } +def _normalize_permissions(permissions): + """Normalise camelCase keys to snake_case so GET responses can be round-tripped.""" + key_map = {"appPermissions": "app_permissions", "globalAccess": "global_access"} + return {key_map.get(k, k): v for k, v in permissions.items()} + + def _validate_permissions(permissions): """ Validate the shape of a permissions object against the Owner role template. @@ -123,7 +129,7 @@ def initial(self, request, *args, **kwargs): action = METHOD_TO_ACTION.get(request.method) if not action: - raise PermissionDenied(f"Unsupported HTTP method: {request.method}") + raise MethodNotAllowed(request.method) account = None is_sa = False @@ -159,18 +165,9 @@ def post(self, request, *args, **kwargs): ) # Validate name - name = request.data.get("name") - if not name or not str(name).strip(): - return Response( - {"error": "Missing required field: name"}, - status=status.HTTP_400_BAD_REQUEST, - ) - name = str(name).strip() - if len(name) > 64: - return Response( - {"error": "Role name cannot exceed 64 characters."}, - status=status.HTTP_400_BAD_REQUEST, - ) + name, err = validate_text_field(request.data.get("name"), "name", max_length=64) + if err: + return Response({"error": err}, status=status.HTTP_400_BAD_REQUEST) # Duplicate name check if Role.objects.filter(organisation=org, name__iexact=name).exists(): @@ -187,6 +184,7 @@ def post(self, request, *args, **kwargs): status=status.HTTP_400_BAD_REQUEST, ) + permissions = _normalize_permissions(permissions) perm_error = _validate_permissions(permissions) if perm_error: return Response( @@ -195,12 +193,11 @@ def post(self, request, *args, **kwargs): ) # Optional fields - description = request.data.get("description", "") - if description and len(str(description)) > 500: - return Response( - {"error": "Role description cannot exceed 500 characters."}, - status=status.HTTP_400_BAD_REQUEST, - ) + raw_desc = request.data.get("description", "") + description, err = validate_text_field(raw_desc, "description", max_length=500, required=False) + if err: + return Response({"error": err}, status=status.HTTP_400_BAD_REQUEST) + description = description or "" color = request.data.get("color", "") if color and len(str(color)) > 7: @@ -261,7 +258,7 @@ def initial(self, request, *args, **kwargs): action = METHOD_TO_ACTION.get(request.method) if not action: - raise PermissionDenied(f"Unsupported HTTP method: {request.method}") + raise MethodNotAllowed(request.method) account = None is_sa = False @@ -316,8 +313,8 @@ def put(self, request, role_id, *args, **kwargs): status=status.HTTP_403_FORBIDDEN, ) - name = request.data.get("name") - description = request.data.get("description") + raw_name = request.data.get("name") + raw_desc = request.data.get("description") color = request.data.get("color") permissions = request.data.get("permissions") @@ -325,24 +322,16 @@ def put(self, request, role_id, *args, **kwargs): role, ["name", "description", "color", "permissions"], request.data ) - if name is None and description is None and color is None and permissions is None: + if raw_name is None and raw_desc is None and color is None and permissions is None: return Response( {"error": "At least one field must be provided."}, status=status.HTTP_400_BAD_REQUEST, ) - if name is not None: - if not str(name).strip(): - return Response( - {"error": "Role name cannot be blank."}, - status=status.HTTP_400_BAD_REQUEST, - ) - name = str(name).strip() - if len(name) > 64: - return Response( - {"error": "Role name cannot exceed 64 characters."}, - status=status.HTTP_400_BAD_REQUEST, - ) + if raw_name is not None: + name, err = validate_text_field(raw_name, "name", max_length=64) + if err: + return Response({"error": err}, status=status.HTTP_400_BAD_REQUEST) # Duplicate name check (exclude self) if ( Role.objects.filter(organisation=org, name__iexact=name) @@ -355,13 +344,11 @@ def put(self, request, role_id, *args, **kwargs): ) role.name = name - if description is not None: - if len(str(description)) > 500: - return Response( - {"error": "Role description cannot exceed 500 characters."}, - status=status.HTTP_400_BAD_REQUEST, - ) - role.description = description + if raw_desc is not None: + description, err = validate_text_field(raw_desc, "description", max_length=500, required=False) + if err: + return Response({"error": err}, status=status.HTTP_400_BAD_REQUEST) + role.description = description or "" if color is not None: if len(str(color)) > 7: @@ -377,12 +364,23 @@ def put(self, request, role_id, *args, **kwargs): {"error": "Permissions must be a JSON object."}, status=status.HTTP_400_BAD_REQUEST, ) + permissions = _normalize_permissions(permissions) perm_error = _validate_permissions(permissions) if perm_error: return Response( {"error": perm_error}, status=status.HTTP_400_BAD_REQUEST, ) + + # Prevent enabling global_access on a role that has service accounts assigned + enabling_global = permissions.get("global_access", False) + if enabling_global and not role.permissions.get("global_access", False): + if ServiceAccount.objects.filter(role=role, deleted_at=None).exists(): + return Response( + {"error": "Cannot enable global access: role is currently assigned to one or more service accounts."}, + status=status.HTTP_400_BAD_REQUEST, + ) + role.permissions = permissions role.save() From 1729cba65566ae358ace16490fcf153a01314f9f Mon Sep 17 00:00:00 2001 From: Nimish Date: Tue, 7 Apr 2026 21:40:38 +0800 Subject: [PATCH 5/6] fix: apply input validation across apps, environments, and service accounts - Apps: validate name/description with validate_text_field() (rejects non-string types, strips HTML tags, enforces length limits) - Service accounts: same validation for name, plus token_name field which was previously unsanitized - Environments: unsupported HTTP methods now return 405 instead of 403 - All three views: MethodNotAllowed replaces PermissionDenied for unsupported HTTP methods --- backend/api/views/apps.py | 65 ++++++++++----------------- backend/api/views/environments.py | 6 +-- backend/api/views/service_accounts.py | 50 ++++++++------------- 3 files changed, 45 insertions(+), 76 deletions(-) diff --git a/backend/api/views/apps.py b/backend/api/views/apps.py index 9ad65ee13..89f556d4a 100644 --- a/backend/api/views/apps.py +++ b/backend/api/views/apps.py @@ -14,14 +14,14 @@ ) from api.utils.audit_logging import log_audit_event, get_actor_info, build_change_values from api.utils.environments import create_environment -from api.utils.rest import METHOD_TO_ACTION, get_resolver_request_meta +from api.utils.rest import METHOD_TO_ACTION, get_resolver_request_meta, validate_text_field from api.throttling import PlanBasedRateThrottle from api.utils.access.middleware import IsIPAllowed from backend.quotas import can_add_app, can_use_custom_envs from rest_framework.views import APIView from rest_framework.permissions import IsAuthenticated -from rest_framework.exceptions import PermissionDenied +from rest_framework.exceptions import MethodNotAllowed, PermissionDenied from rest_framework.response import Response from rest_framework import status from djangorestframework_camel_case.render import CamelCaseJSONRenderer @@ -53,7 +53,7 @@ def initial(self, request, *args, **kwargs): action = METHOD_TO_ACTION.get(request.method) if not action: - raise PermissionDenied(f"Unsupported HTTP method: {request.method}") + raise MethodNotAllowed(request.method) account = None is_sa = False @@ -102,25 +102,15 @@ def post(self, request, *args, **kwargs): org = self._get_org(request) # --- Validate input --- - name = request.data.get("name") - if not name or not str(name).strip(): - return Response( - {"error": "Missing required field: name"}, - status=status.HTTP_400_BAD_REQUEST, - ) - name = str(name).strip() - if len(name) > 64: - return Response( - {"error": "App name cannot exceed 64 characters."}, - status=status.HTTP_400_BAD_REQUEST, - ) + name, err = validate_text_field(request.data.get("name"), "name", max_length=64) + if err: + return Response({"error": err}, status=status.HTTP_400_BAD_REQUEST) description = request.data.get("description", None) - if description is not None and len(str(description)) > 10000: - return Response( - {"error": "App description cannot exceed 10,000 characters."}, - status=status.HTTP_400_BAD_REQUEST, - ) + if description is not None: + description, err = validate_text_field(description, "description", max_length=10000, required=False) + if err: + return Response({"error": err}, status=status.HTTP_400_BAD_REQUEST) # --- Validate optional environments list --- custom_envs = request.data.get("environments", None) @@ -289,7 +279,7 @@ def initial(self, request, *args, **kwargs): action = METHOD_TO_ACTION.get(request.method) if not action: - raise PermissionDenied(f"Unsupported HTTP method: {request.method}") + raise MethodNotAllowed(request.method) account = None is_sa = False @@ -321,10 +311,10 @@ def get(self, request, app_id, *args, **kwargs): def put(self, request, app_id, *args, **kwargs): app = request.auth["app"] - name = request.data.get("name") - description = request.data.get("description") + raw_name = request.data.get("name") + raw_desc = request.data.get("description") - if name is None and description is None: + if raw_name is None and raw_desc is None: return Response( {"error": "At least one of 'name' or 'description' must be provided."}, status=status.HTTP_400_BAD_REQUEST, @@ -334,25 +324,16 @@ def put(self, request, app_id, *args, **kwargs): app, ["name", "description"], request.data ) - if name is not None: - if not name or str(name).strip() == "": - return Response( - {"error": "App name cannot be blank."}, - status=status.HTTP_400_BAD_REQUEST, - ) - if len(str(name)) > 64: - return Response( - {"error": "App name cannot exceed 64 characters."}, - status=status.HTTP_400_BAD_REQUEST, - ) - app.name = str(name).strip() + if raw_name is not None: + name, err = validate_text_field(raw_name, "name", max_length=64) + if err: + return Response({"error": err}, status=status.HTTP_400_BAD_REQUEST) + app.name = name - if description is not None: - if len(str(description)) > 10000: - return Response( - {"error": "App description cannot exceed 10,000 characters."}, - status=status.HTTP_400_BAD_REQUEST, - ) + if raw_desc is not None: + description, err = validate_text_field(raw_desc, "description", max_length=10000, required=False) + if err: + return Response({"error": err}, status=status.HTTP_400_BAD_REQUEST) app.description = description app.save() diff --git a/backend/api/views/environments.py b/backend/api/views/environments.py index 49a454be6..1cb5275c3 100644 --- a/backend/api/views/environments.py +++ b/backend/api/views/environments.py @@ -17,7 +17,7 @@ from rest_framework.views import APIView from rest_framework.permissions import IsAuthenticated -from rest_framework.exceptions import PermissionDenied +from rest_framework.exceptions import MethodNotAllowed, PermissionDenied from rest_framework.response import Response from rest_framework import status from djangorestframework_camel_case.render import CamelCaseJSONRenderer @@ -43,7 +43,7 @@ def initial(self, request, *args, **kwargs): action = METHOD_TO_ACTION.get(request.method) if not action: - raise PermissionDenied(f"Unsupported HTTP method: {request.method}") + raise MethodNotAllowed(request.method) account = None is_sa = False @@ -176,7 +176,7 @@ def initial(self, request, *args, **kwargs): action = METHOD_TO_ACTION.get(request.method) if not action: - raise PermissionDenied(f"Unsupported HTTP method: {request.method}") + raise MethodNotAllowed(request.method) account = None is_sa = False diff --git a/backend/api/views/service_accounts.py b/backend/api/views/service_accounts.py index 3eaf0e2b9..7d01ceedc 100644 --- a/backend/api/views/service_accounts.py +++ b/backend/api/views/service_accounts.py @@ -29,14 +29,14 @@ ) from api.utils.environments import _ed25519_pk_to_curve25519, _wrap_env_secrets_for_key from api.utils.audit_logging import log_audit_event, get_actor_info, build_change_values -from api.utils.rest import METHOD_TO_ACTION, get_resolver_request_meta +from api.utils.rest import METHOD_TO_ACTION, get_resolver_request_meta, validate_text_field from api.utils.service_accounts import generate_server_managed_sa_keys from api.throttling import PlanBasedRateThrottle from api.utils.access.middleware import IsIPAllowed from rest_framework.views import APIView from rest_framework.permissions import IsAuthenticated -from rest_framework.exceptions import PermissionDenied +from rest_framework.exceptions import MethodNotAllowed, PermissionDenied from rest_framework.response import Response from rest_framework import status from djangorestframework_camel_case.render import CamelCaseJSONRenderer @@ -117,7 +117,7 @@ def initial(self, request, *args, **kwargs): action = METHOD_TO_ACTION.get(request.method) if not action: - raise PermissionDenied(f"Unsupported HTTP method: {request.method}") + raise MethodNotAllowed(request.method) account = None is_sa = False @@ -151,18 +151,9 @@ def post(self, request, *args, **kwargs): org = self._get_org(request) # --- Validate input --- - name = request.data.get("name") - if not name or not str(name).strip(): - return Response( - {"error": "Missing required field: name"}, - status=status.HTTP_400_BAD_REQUEST, - ) - name = str(name).strip() - if len(name) > 64: - return Response( - {"error": "Service account name cannot exceed 64 characters."}, - status=status.HTTP_400_BAD_REQUEST, - ) + name, err = validate_text_field(request.data.get("name"), "name", max_length=64) + if err: + return Response({"error": err}, status=status.HTTP_400_BAD_REQUEST) # --- Validate role --- role_id = request.data.get("role_id") @@ -214,7 +205,11 @@ def post(self, request, *args, **kwargs): update_stripe_subscription_seats(org) # --- Mint an initial token --- - token_name = request.data.get("token_name", "Default") + raw_token_name = request.data.get("token_name", "Default") + token_name, err = validate_text_field(raw_token_name, "token_name", max_length=64) + if err: + return Response({"error": err}, status=status.HTTP_400_BAD_REQUEST) + token_name = token_name or "Default" pk, sk = get_server_keypair() keyring_json = decrypt_asymmetric( @@ -291,7 +286,7 @@ def initial(self, request, *args, **kwargs): action = METHOD_TO_ACTION.get(request.method) if not action: - raise PermissionDenied(f"Unsupported HTTP method: {request.method}") + raise MethodNotAllowed(request.method) account = None is_sa = False @@ -341,27 +336,20 @@ def put(self, request, sa_id, *args, **kwargs): old_name = sa.name old_role_name = sa.role.name if sa.role else None - name = request.data.get("name") + raw_name = request.data.get("name") role_id = request.data.get("role_id") - if name is None and role_id is None: + if raw_name is None and role_id is None: return Response( {"error": "At least one of 'name' or 'role_id' must be provided."}, status=status.HTTP_400_BAD_REQUEST, ) - if name is not None: - if not name or str(name).strip() == "": - return Response( - {"error": "Service account name cannot be blank."}, - status=status.HTTP_400_BAD_REQUEST, - ) - if len(str(name)) > 64: - return Response( - {"error": "Service account name cannot exceed 64 characters."}, - status=status.HTTP_400_BAD_REQUEST, - ) - sa.name = str(name).strip() + if raw_name is not None: + name, err = validate_text_field(raw_name, "name", max_length=64) + if err: + return Response({"error": err}, status=status.HTTP_400_BAD_REQUEST) + sa.name = name if role_id is not None: try: From a3bde333f8c509822a868fa4c26b8f3dac81b398 Mon Sep 17 00:00:00 2001 From: Nimish Date: Wed, 8 Apr 2026 17:22:00 +0800 Subject: [PATCH 6/6] fix: UnboundLocalError when updating SA role without name PUT /v1/service-accounts/:id/ with only role_id (no name) crashed with UnboundLocalError because the audit log check referenced `name` which is only bound inside the `if raw_name is not None` block. Changed to reference `raw_name` which is always bound from request.data.get(). --- backend/api/views/service_accounts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/api/views/service_accounts.py b/backend/api/views/service_accounts.py index 7d01ceedc..257b40484 100644 --- a/backend/api/views/service_accounts.py +++ b/backend/api/views/service_accounts.py @@ -371,7 +371,7 @@ def put(self, request, sa_id, *args, **kwargs): # Audit log old_vals = {} new_vals = {} - if name is not None and old_name != sa.name: + if raw_name is not None and old_name != sa.name: old_vals["name"] = old_name new_vals["name"] = sa.name if role_id is not None and old_role_name != (sa.role.name if sa.role else None):