From f7d2e77b2675807f28265c06c3c22793a0d246fb Mon Sep 17 00:00:00 2001 From: Creeper19472 Date: Tue, 16 Jun 2026 10:53:58 +0800 Subject: [PATCH 1/3] feat: add user permissions management functionality --- src/include/classes/enum/permissions.py | 1 + src/include/database/models/classic.py | 35 +++++++++++ src/include/handlers/management/user.py | 82 +++++++++++++++++++++++++ src/include/router.py | 2 + src/main.py | 1 + tests/test_client.py | 8 +++ tests/test_users.py | 22 +++++++ 7 files changed, 151 insertions(+) diff --git a/src/include/classes/enum/permissions.py b/src/include/classes/enum/permissions.py index 501ebf9..031d046 100644 --- a/src/include/classes/enum/permissions.py +++ b/src/include/classes/enum/permissions.py @@ -45,6 +45,7 @@ class Permissions(StrEnum): LIST_USERS = "list_users" MANAGE_2FA = "manage_2fa" SET_PASSWD = "set_passwd" + SET_USER_PERMISSIONS = "set_user_permissions" # 组管理 CREATE_GROUP = "create_group" diff --git a/src/include/database/models/classic.py b/src/include/database/models/classic.py index 7486c1a..b777515 100644 --- a/src/include/database/models/classic.py +++ b/src/include/database/models/classic.py @@ -363,6 +363,41 @@ def all_groups(self, new_group_list: list[str]): self.groups.append(membership) # session.commit() + @property + def own_permissions(self) -> Set[Permissions]: + now = time.time() + granted_perms = { + perm.permission + for perm in self.rights + if perm.granted and (perm.end_time is None or perm.end_time >= now) + } + revoked_perms = { + perm.permission + for perm in self.rights + if not perm.granted and (perm.end_time is None or perm.end_time >= now) + } + return granted_perms - revoked_perms + + @own_permissions.setter + def own_permissions(self, new_permission_list: list[str]): + session = object_session(self) + if not session: + raise RuntimeError() + + for old_permission in self.rights: + session.delete(old_permission) + self.rights.clear() + for new_permission in new_permission_list: + permission = UserPermission( + user=self, + username=self.username, + permission=new_permission, + start_time=time.time(), + end_time=None, + ) + session.add(permission) + self.rights.append(permission) + @cached_property def all_permissions(self) -> Set[Permissions]: now = time.time() diff --git a/src/include/handlers/management/user.py b/src/include/handlers/management/user.py index 1552107..04ed5e8 100644 --- a/src/include/handlers/management/user.py +++ b/src/include/handlers/management/user.py @@ -10,6 +10,7 @@ "RequestGetUserAvatarHandler", "RequestSetUserAvatarHandler", "RequestChangeUserGroupsHandler", + "RequestChangeUserPermissionsHandler", "RequestSetPasswdHandler", "RequestManageUserStatusHandler", ] @@ -832,6 +833,87 @@ def handle(self, handler: ConnectionHandler): handler.conclude_request(**response) +class RequestChangeUserPermissionsHandler(RequestHandler): + schema = { + "type": "object", + "properties": { + "username": {"type": "string", "minLength": 1}, + "permissions": { + "type": "array", + "items": { + "type": "string", + "additionalProperties": False, + }, + }, + }, + "required": ["username", "permissions"], + "additionalProperties": False, + } + + require_auth = True + + def handle(self, handler: ConnectionHandler): + with Session() as session: + this_user = User.get_existing(session, handler.username) + + if Permissions.SET_USER_PERMISSIONS not in this_user.all_permissions: + handler.conclude_request( + **{ + "code": 403, + "message": "You do not have permission to set user permissions", + "data": {}, + } + ) + return 403, handler.data["username"], handler.username + + target_username = handler.data["username"] + if not target_username: + handler.conclude_request( + **{ + "code": 400, + "message": "Username is required", + "data": {}, + } + ) + return + + user_to_change = session.get(User, target_username) + if not user_to_change: + handler.conclude_request( + **{ + "code": 404, + "message": "User does not exist", + "data": {}, + } + ) + return 404, target_username, handler.username + + new_permissions = handler.data.get("permissions", []) + + if not all(isinstance(permission, str) for permission in new_permissions): + handler.conclude_request( + **{ + "code": 400, + "message": "All permissions must be of type str", + "data": {}, + } + ) + return + + if set(new_permissions) != user_to_change.own_permissions: + user_to_change.own_permissions = new_permissions + session.commit() + + response = { + "code": 200, + "message": "User permissions set successfully", + "data": {}, + } + + handler.conclude_request(**response) + return 0, handler.data["username"], handler.username + + class RequestSetPasswdHandler(RequestHandler): schema = { "type": "object", diff --git a/src/include/router.py b/src/include/router.py index bf1aa07..d924bf7 100644 --- a/src/include/router.py +++ b/src/include/router.py @@ -73,6 +73,7 @@ from include.handlers.management.user import ( RequestBlockUserHandler, RequestChangeUserGroupsHandler, + RequestChangeUserPermissionsHandler, RequestCreateUserHandler, RequestDeleteUserHandler, RequestGetUserAvatarHandler, @@ -166,6 +167,7 @@ "get_user_avatar": RequestGetUserAvatarHandler, "set_user_avatar": RequestSetUserAvatarHandler, "change_user_groups": RequestChangeUserGroupsHandler, + "change_user_permissions": RequestChangeUserPermissionsHandler, "set_passwd": RequestSetPasswdHandler, # 用户组类 "list_groups": RequestListGroupsHandler, diff --git a/src/main.py b/src/main.py index 0b7b518..b4a2962 100644 --- a/src/main.py +++ b/src/main.py @@ -129,6 +129,7 @@ def server_init(): {"permission": Permissions.RENAME_USER}, {"permission": Permissions.MANAGE_USER_STATUS}, {"permission": Permissions.GET_USER_INFO}, + {"permission": Permissions.SET_USER_PERMISSIONS}, {"permission": Permissions.GET_GROUP_INFO}, {"permission": Permissions.CHANGE_USER_GROUPS}, {"permission": Permissions.SUPER_SET_PASSWD}, diff --git a/tests/test_client.py b/tests/test_client.py index acdc36d..b591ee5 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -743,6 +743,14 @@ async def get_user_info(self, username: str) -> Dict[str, Any]: """ return await self.send_request("get_user_info", {"username": username}) + async def change_user_permissions( + self, username: str, permissions: list[str] + ) -> Dict[str, Any]: + return await self.send_request( + "change_user_permissions", + {"username": username, "permissions": permissions}, + ) + async def list_users(self) -> Dict[str, Any]: """ List all users. diff --git a/tests/test_users.py b/tests/test_users.py index ddb1d22..c9254b7 100644 --- a/tests/test_users.py +++ b/tests/test_users.py @@ -78,6 +78,19 @@ async def test_get_admin_user_info(self, authenticated_client: CFMSTestClient): data = assert_success(response) assert data["username"] == "admin" + @pytest.mark.asyncio + async def test_change_user_permissions( + self, authenticated_client: CFMSTestClient, test_user: dict + ): + response = await authenticated_client.change_user_permissions( + test_user["username"], ["list_users"] + ) + assert_success(response) + + info_response = await authenticated_client.get_user_info(test_user["username"]) + data = assert_success(info_response) + assert "list_users" in data["permissions"] + class TestUserWithoutAuth: @pytest.mark.asyncio @@ -100,3 +113,12 @@ async def test_get_user_info_without_auth(self, client: CFMSTestClient): "get_user_info", {"username": "admin"}, include_auth=False ) assert_error(response, 401) + + @pytest.mark.asyncio + async def test_change_user_permissions_without_auth(self, client: CFMSTestClient): + response = await client.send_request( + "change_user_permissions", + {"username": "admin", "permissions": []}, + include_auth=False, + ) + assert_error(response, 401) From d93b1ddf25d5e42e278d9a0073d3e546a4b71d83 Mon Sep 17 00:00:00 2001 From: Creeper19472 Date: Tue, 16 Jun 2026 11:01:28 +0800 Subject: [PATCH 2/3] feat: enhance permission management with utility functions for grants and revocations --- src/include/database/models/classic.py | 161 ++++++++++++------------- 1 file changed, 77 insertions(+), 84 deletions(-) diff --git a/src/include/database/models/classic.py b/src/include/database/models/classic.py index b777515..ea048dd 100644 --- a/src/include/database/models/classic.py +++ b/src/include/database/models/classic.py @@ -1,7 +1,7 @@ import secrets import time from functools import cached_property -from typing import TYPE_CHECKING, List, Optional, Set, cast +from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Optional, Set, cast import jwt import orjson @@ -43,6 +43,51 @@ from include.database.models.keyring import UserKey +def _permission_grants_and_revocations( + permission_entries: Iterable[Any], now: Optional[float] = None +) -> tuple[set, set]: + if now is None: + now = time.time() + + granted_permissions = set() + revoked_permissions = set() + + for entry in permission_entries: + if entry.end_time is not None and entry.end_time < now: + continue + + target = granted_permissions if entry.granted else revoked_permissions + target.add(entry.permission) + + return granted_permissions, revoked_permissions + + +def _effective_permissions( + permission_entries: Iterable[Any], now: Optional[float] = None +) -> set: + granted_permissions, revoked_permissions = _permission_grants_and_revocations( + permission_entries, now + ) + return granted_permissions - revoked_permissions + + +def _replace_permission_entries( + session, + current_entries: list[Any], + new_permission_list: list[str], + create_entry: Callable[[str, float], Any], +) -> None: + for old_permission in list(current_entries): + session.delete(old_permission) + current_entries.clear() + + now = time.time() + for permission_name in new_permission_list: + permission = create_entry(permission_name, now) + session.add(permission) + current_entries.append(permission) + + class User(Base): __tablename__ = "users" # id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) @@ -365,18 +410,7 @@ def all_groups(self, new_group_list: list[str]): @property def own_permissions(self) -> Set[Permissions]: - now = time.time() - granted_perms = { - perm.permission - for perm in self.rights - if perm.granted and (perm.end_time is None or perm.end_time >= now) - } - revoked_perms = { - perm.permission - for perm in self.rights - if not perm.granted and (perm.end_time is None or perm.end_time >= now) - } - return granted_perms - revoked_perms + return _effective_permissions(self.rights) @own_permissions.setter def own_permissions(self, new_permission_list: list[str]): @@ -384,32 +418,27 @@ def own_permissions(self, new_permission_list: list[str]): if not session: raise RuntimeError() - for old_permission in self.rights: - session.delete(old_permission) - self.rights.clear() - for new_permission in new_permission_list: - permission = UserPermission( + _replace_permission_entries( + session, + self.rights, + new_permission_list, + lambda permission, now: UserPermission( user=self, username=self.username, - permission=new_permission, - start_time=time.time(), + permission=permission, + start_time=now, end_time=None, - ) - session.add(permission) - self.rights.append(permission) + ), + ) @cached_property def all_permissions(self) -> Set[Permissions]: now = time.time() - # 用户自身有效权限 - user_perms = { - perm.permission - for perm in self.rights - if perm.granted and (perm.end_time is None or perm.end_time >= now) - } - # 用户组有效权限与剥夺权限 + user_granted_perms, revoked_perms = _permission_grants_and_revocations( + self.rights, now + ) group_granted_perms = set() - group_revoked_perms = set() + for membership in getattr(self, "groups", []): membership: UserMembership # 检查用户组的起止时间 @@ -423,26 +452,18 @@ def all_permissions(self) -> Set[Permissions]: with Session() as session: group = session.get(UserGroup, membership.group_name) if group: - for perm in group.permissions: - if perm.end_time is None or perm.end_time >= now: - if perm.granted: - group_granted_perms.add(perm.permission) - else: - group_revoked_perms.add(perm.permission) + group_grants, group_revocations = ( + _permission_grants_and_revocations(group.permissions, now) + ) + group_granted_perms |= group_grants + revoked_perms |= group_revocations else: raise ValueError( f"UserMembership {membership.id} does not have a valid group_name attribute." ) - # 合并 - all_perms = user_perms | group_granted_perms - # 再减去被剥夺的权限(包括用户自身和用户组) - revoked_perms = { - perm.permission - for perm in self.rights - if not perm.granted and (perm.end_time is None or perm.end_time >= now) - } - revoked_perms |= group_revoked_perms + + all_perms = user_granted_perms | group_granted_perms return (all_perms - revoked_perms) if (all_perms or revoked_perms) else set() @@ -538,34 +559,7 @@ class UserGroup(Base): @property def all_permissions(self) -> Set[str]: - """ - 该属性的实现是对 User.all_permissions 的复制。 - """ - - now = time.time() - # 用户组自身有效权限 - group_granted_perms = { - perm.permission - for perm in self.permissions - if perm.granted and (perm.end_time is None or perm.end_time >= now) - } - # 用户组剥夺权限 - group_revoked_perms = set() - - for perm in self.permissions: - if perm.end_time is None or perm.end_time >= now: - if perm.granted: - group_granted_perms.add(perm.permission) - else: - group_revoked_perms.add(perm.permission) - # 合并 - all_perms = group_granted_perms - # 再减去被剥夺的权限 - return ( - (all_perms - group_revoked_perms) - if (all_perms or group_revoked_perms) - else set() - ) + return _effective_permissions(self.permissions) @all_permissions.setter def all_permissions(self, new_permission_list: list[str]): @@ -573,19 +567,18 @@ def all_permissions(self, new_permission_list: list[str]): if not session: raise RuntimeError() - for old_permission in self.permissions: - session.delete(old_permission) - self.permissions.clear() - for new_permission in new_permission_list: - permission = UserGroupPermission( + _replace_permission_entries( + session, + self.permissions, + new_permission_list, + lambda permission, now: UserGroupPermission( group=self, group_name=self.group_name, - permission=new_permission, - start_time=time.time(), + permission=permission, + start_time=now, end_time=None, - ) - session.add(permission) - self.permissions.append(permission) + ), + ) @property def members(self) -> set[str]: From 4e21929dbe629fec83f9a6c9ae3c75c3cfe6afee Mon Sep 17 00:00:00 2001 From: Creeper19472 Date: Tue, 16 Jun 2026 11:18:44 +0800 Subject: [PATCH 3/3] feat: add test for permission restriction on changing user permissions --- tests/test_users.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/test_users.py b/tests/test_users.py index c9254b7..bfa4613 100644 --- a/tests/test_users.py +++ b/tests/test_users.py @@ -91,6 +91,33 @@ async def test_change_user_permissions( data = assert_success(info_response) assert "list_users" in data["permissions"] + @pytest.mark.asyncio + async def test_change_user_permissions_requires_set_user_permissions( + self, + authenticated_client: CFMSTestClient, + unauthenticated_client: CFMSTestClient, + user_factory, + ): + operator_user = await user_factory() + target_user = await user_factory() + + login_response = await unauthenticated_client.login( + operator_user["username"], operator_user["password"] + ) + assert_success(login_response) + + response = await unauthenticated_client.change_user_permissions( + target_user["username"], ["list_users"] + ) + error = assert_error(response, 403) + assert error["message"] == "You do not have permission to set user permissions" + + info_response = await authenticated_client.get_user_info( + target_user["username"] + ) + data = assert_success(info_response) + assert "list_users" not in data["permissions"] + class TestUserWithoutAuth: @pytest.mark.asyncio