Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/include/classes/enum/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Permissions(StrEnum):
LIST_USERS = "list_users"
MANAGE_2FA = "manage_2fa"
SET_PASSWD = "set_passwd"
SET_USER_PERMISSIONS = "set_user_permissions"

# 组管理
CREATE_GROUP = "create_group"
Expand Down
152 changes: 90 additions & 62 deletions src/include/database/models/classic.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import secrets
import time
from functools import cached_property
from typing import TYPE_CHECKING, List, Optional, Set, cast
from typing import TYPE_CHECKING, Any, Callable, Iterable, List, Optional, Set, cast

import jwt
import orjson
Expand Down Expand Up @@ -43,6 +43,51 @@
from include.database.models.keyring import UserKey


def _permission_grants_and_revocations(
permission_entries: Iterable[Any], now: Optional[float] = None
) -> tuple[set, set]:
if now is None:
now = time.time()

granted_permissions = set()
revoked_permissions = set()

for entry in permission_entries:
if entry.end_time is not None and entry.end_time < now:
continue

target = granted_permissions if entry.granted else revoked_permissions
target.add(entry.permission)

return granted_permissions, revoked_permissions


def _effective_permissions(
permission_entries: Iterable[Any], now: Optional[float] = None
) -> set:
granted_permissions, revoked_permissions = _permission_grants_and_revocations(
permission_entries, now
)
return granted_permissions - revoked_permissions


def _replace_permission_entries(
session,
current_entries: list[Any],
new_permission_list: list[str],
create_entry: Callable[[str, float], Any],
) -> None:
for old_permission in list(current_entries):
session.delete(old_permission)
current_entries.clear()

now = time.time()
for permission_name in new_permission_list:
permission = create_entry(permission_name, now)
session.add(permission)
current_entries.append(permission)


class User(Base):
__tablename__ = "users"
# id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
Expand Down Expand Up @@ -363,18 +408,37 @@ def all_groups(self, new_group_list: list[str]):
self.groups.append(membership)
# session.commit()

@property
def own_permissions(self) -> Set[Permissions]:
return _effective_permissions(self.rights)

@own_permissions.setter
def own_permissions(self, new_permission_list: list[str]):
session = object_session(self)
if not session:
raise RuntimeError()

_replace_permission_entries(
session,
self.rights,
new_permission_list,
lambda permission, now: UserPermission(
user=self,
username=self.username,
permission=permission,
start_time=now,
end_time=None,
),
)

@cached_property
def all_permissions(self) -> Set[Permissions]:
now = time.time()
# 用户自身有效权限
user_perms = {
perm.permission
for perm in self.rights
if perm.granted and (perm.end_time is None or perm.end_time >= now)
}
# 用户组有效权限与剥夺权限
user_granted_perms, revoked_perms = _permission_grants_and_revocations(
self.rights, now
)
group_granted_perms = set()
group_revoked_perms = set()

for membership in getattr(self, "groups", []):
membership: UserMembership
# 检查用户组的起止时间
Expand All @@ -388,26 +452,18 @@ def all_permissions(self) -> Set[Permissions]:
with Session() as session:
group = session.get(UserGroup, membership.group_name)
if group:
for perm in group.permissions:
if perm.end_time is None or perm.end_time >= now:
if perm.granted:
group_granted_perms.add(perm.permission)
else:
group_revoked_perms.add(perm.permission)
group_grants, group_revocations = (
_permission_grants_and_revocations(group.permissions, now)
)
group_granted_perms |= group_grants
revoked_perms |= group_revocations

else:
raise ValueError(
f"UserMembership {membership.id} does not have a valid group_name attribute."
)
# 合并
all_perms = user_perms | group_granted_perms
# 再减去被剥夺的权限(包括用户自身和用户组)
revoked_perms = {
perm.permission
for perm in self.rights
if not perm.granted and (perm.end_time is None or perm.end_time >= now)
}
revoked_perms |= group_revoked_perms

all_perms = user_granted_perms | group_granted_perms
return (all_perms - revoked_perms) if (all_perms or revoked_perms) else set()


Expand Down Expand Up @@ -503,54 +559,26 @@ class UserGroup(Base):

@property
def all_permissions(self) -> Set[str]:
"""
该属性的实现是对 User.all_permissions 的复制。
"""

now = time.time()
# 用户组自身有效权限
group_granted_perms = {
perm.permission
for perm in self.permissions
if perm.granted and (perm.end_time is None or perm.end_time >= now)
}
# 用户组剥夺权限
group_revoked_perms = set()

for perm in self.permissions:
if perm.end_time is None or perm.end_time >= now:
if perm.granted:
group_granted_perms.add(perm.permission)
else:
group_revoked_perms.add(perm.permission)
# 合并
all_perms = group_granted_perms
# 再减去被剥夺的权限
return (
(all_perms - group_revoked_perms)
if (all_perms or group_revoked_perms)
else set()
)
return _effective_permissions(self.permissions)

@all_permissions.setter
def all_permissions(self, new_permission_list: list[str]):
session = object_session(self)
if not session:
raise RuntimeError()

for old_permission in self.permissions:
session.delete(old_permission)
self.permissions.clear()
for new_permission in new_permission_list:
permission = UserGroupPermission(
_replace_permission_entries(
session,
self.permissions,
new_permission_list,
lambda permission, now: UserGroupPermission(
group=self,
group_name=self.group_name,
permission=new_permission,
start_time=time.time(),
permission=permission,
start_time=now,
end_time=None,
)
session.add(permission)
self.permissions.append(permission)
),
)

@property
def members(self) -> set[str]:
Expand Down
82 changes: 82 additions & 0 deletions src/include/handlers/management/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"RequestGetUserAvatarHandler",
"RequestSetUserAvatarHandler",
"RequestChangeUserGroupsHandler",
"RequestChangeUserPermissionsHandler",
"RequestSetPasswdHandler",
"RequestManageUserStatusHandler",
]
Expand Down Expand Up @@ -832,6 +833,87 @@ def handle(self, handler: ConnectionHandler):
handler.conclude_request(**response)


class RequestChangeUserPermissionsHandler(RequestHandler):
schema = {
"type": "object",
"properties": {
"username": {"type": "string", "minLength": 1},
"permissions": {
"type": "array",
"items": {
"type": "string",
"additionalProperties": False,
},
},
},
"required": ["username", "permissions"],
"additionalProperties": False,
}

require_auth = True

def handle(self, handler: ConnectionHandler):
with Session() as session:
this_user = User.get_existing(session, handler.username)

if Permissions.SET_USER_PERMISSIONS not in this_user.all_permissions:
handler.conclude_request(
**{
"code": 403,
"message": "You do not have permission to set user permissions",
"data": {},
}
)
return 403, handler.data["username"], handler.username

target_username = handler.data["username"]
if not target_username:
handler.conclude_request(
**{
"code": 400,
"message": "Username is required",
"data": {},
}
)
return

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Validate incoming permission names against the Permissions enum instead of accepting arbitrary strings.

Currently we only check that new_permissions items are strings, so arbitrary or misspelled permissions can be stored and then never enforced. Please validate each entry against the Permissions enum (e.g., casting with try/except or checking against a whitelist) and return a 4xx for unknown values to prevent inconsistent or ineffective permission assignments.

Suggested implementation:

            new_permissions = handler.data.get("permissions", [])

            # Validate that all permissions are strings
            if not all(isinstance(permission, str) for permission in new_permissions):
                handler.conclude_request(
                    **{
                        "code": 400,
                        "message": "All permissions must be of type str",
                        "data": {},
                    }
                )
                return

            # Validate that all permissions map to known Permissions enum values
            valid_permissions = {permission.value for permission in Permissions}
            invalid_permissions = [
                permission
                for permission in new_permissions
                if permission not in valid_permissions
            ]

            if invalid_permissions:
                handler.conclude_request(
                    **{
                        "code": 400,
                        "message": "Unknown permissions: " + ", ".join(invalid_permissions),
                        "data": {},
                    }
                )
                return
  1. Ensure the Permissions enum is imported in src/include/handlers/management/user.py. For example (adjust path/naming to your project structure):

    from src.include.models.permissions import Permissions

  2. If your Permissions enum uses names instead of values for external representation, replace permission.value with permission.name in valid_permissions = {permission.value for permission in Permissions}.

  3. If you prefer a different error format (e.g., structured data detailing invalid permissions), adjust the "message" and "data" fields accordingly but keep the 4xx status code.


user_to_change = session.get(User, target_username)
if not user_to_change:
handler.conclude_request(
**{
"code": 404,
"message": "User does not exist",
"data": {},
}
)
return 404, target_username, handler.username

new_permissions = handler.data.get("permissions", [])

if not all(isinstance(permission, str) for permission in new_permissions):
handler.conclude_request(
**{
"code": 400,
"message": "All permissions must be of type str",
"data": {},
}
)
return

if set(new_permissions) != user_to_change.own_permissions:
user_to_change.own_permissions = new_permissions
session.commit()

response = {
"code": 200,
"message": "User permissions set successfully",
"data": {},
}

handler.conclude_request(**response)
return 0, handler.data["username"], handler.username


class RequestSetPasswdHandler(RequestHandler):
schema = {
"type": "object",
Expand Down
2 changes: 2 additions & 0 deletions src/include/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
from include.handlers.management.user import (
RequestBlockUserHandler,
RequestChangeUserGroupsHandler,
RequestChangeUserPermissionsHandler,
RequestCreateUserHandler,
RequestDeleteUserHandler,
RequestGetUserAvatarHandler,
Expand Down Expand Up @@ -166,6 +167,7 @@
"get_user_avatar": RequestGetUserAvatarHandler,
"set_user_avatar": RequestSetUserAvatarHandler,
"change_user_groups": RequestChangeUserGroupsHandler,
"change_user_permissions": RequestChangeUserPermissionsHandler,
"set_passwd": RequestSetPasswdHandler,
# 用户组类
"list_groups": RequestListGroupsHandler,
Expand Down
1 change: 1 addition & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def server_init():
{"permission": Permissions.RENAME_USER},
{"permission": Permissions.MANAGE_USER_STATUS},
{"permission": Permissions.GET_USER_INFO},
{"permission": Permissions.SET_USER_PERMISSIONS},
{"permission": Permissions.GET_GROUP_INFO},
{"permission": Permissions.CHANGE_USER_GROUPS},
{"permission": Permissions.SUPER_SET_PASSWD},
Expand Down
8 changes: 8 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,14 @@ async def get_user_info(self, username: str) -> Dict[str, Any]:
"""
return await self.send_request("get_user_info", {"username": username})

async def change_user_permissions(
self, username: str, permissions: list[str]
) -> Dict[str, Any]:
return await self.send_request(
"change_user_permissions",
{"username": username, "permissions": permissions},
)

async def list_users(self) -> Dict[str, Any]:
"""
List all users.
Expand Down
Loading