From d481aefa4f64a44d3eaa71f02a877437502eff2d Mon Sep 17 00:00:00 2001 From: Mohammad Date: Fri, 22 May 2026 15:52:18 +0330 Subject: [PATCH 01/10] feat(admin-role): add HWID policy to admin roles and update related models and tests --- app/db/crud/admin_role.py | 3 + ...9d34a1f0_add_hwid_policy_to_admin_roles.py | 38 +++++++++++ app/db/models.py | 7 ++ app/models/admin.py | 3 +- app/models/admin_role.py | 10 +++ app/operation/subscription.py | 44 ++++++++++-- tests/api/test_admin_role.py | 28 +++++++- tests/api/test_hwid.py | 67 ++++++++++++++++++- 8 files changed, 190 insertions(+), 10 deletions(-) create mode 100644 app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py diff --git a/app/db/crud/admin_role.py b/app/db/crud/admin_role.py index a5e0813a..8d4b8e2c 100644 --- a/app/db/crud/admin_role.py +++ b/app/db/crud/admin_role.py @@ -52,6 +52,7 @@ async def create_role(db: AsyncSession, data: AdminRoleCreate) -> AdminRole: limits=data.limits.model_dump(), features=data.features.model_dump(), access=data.access.model_dump(), + hwid=data.hwid.model_dump(), disabled_when_limited=data.disabled_when_limited, disable_users_when_limited=data.disable_users_when_limited, ) @@ -74,6 +75,8 @@ async def modify_role(db: AsyncSession, role: AdminRole, data: AdminRoleModify) role.features = data.features.model_dump() if data.access is not None: role.access = data.access.model_dump() + if data.hwid is not None: + role.hwid = data.hwid.model_dump() if data.disabled_when_limited is not None: role.disabled_when_limited = data.disabled_when_limited if data.disable_users_when_limited is not None: diff --git a/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py b/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py new file mode 100644 index 00000000..3b707193 --- /dev/null +++ b/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py @@ -0,0 +1,38 @@ +"""add hwid policy to admin roles + +Revision ID: 2c6e9d34a1f0 +Revises: bb4a32b7f5ce +Create Date: 2026-05-22 00:00:00.000000 + +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "2c6e9d34a1f0" +down_revision = "bb4a32b7f5ce" +branch_labels = None +depends_on = None + +def upgrade() -> None: + with op.batch_alter_table("admin_roles", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "hwid", + sa.JSON(), + nullable=True, + ) + ) + + # Cross-dialect safe JSON initialization (SQLite/MySQL/PostgreSQL) + op.execute("UPDATE admin_roles SET hwid = '{\"enabled\":\"use_panel\",\"forced\":false}' WHERE hwid IS NULL") + + with op.batch_alter_table("admin_roles", schema=None) as batch_op: + batch_op.alter_column("hwid", existing_type=sa.JSON(), type_=sa.JSON(), nullable=False) + + +def downgrade() -> None: + with op.batch_alter_table("admin_roles", schema=None) as batch_op: + batch_op.drop_column("hwid") diff --git a/app/db/models.py b/app/db/models.py index 35f5936c..1bbe104b 100644 --- a/app/db/models.py +++ b/app/db/models.py @@ -69,6 +69,12 @@ class AdminStatus(str, Enum): limited = "limited" +class HWIDPolicy(str, Enum): + use_panel = "use_panel" + on = "on" + off = "off" + + class Admin(Base): __tablename__ = "admins" @@ -886,6 +892,7 @@ class AdminRole(Base): limits: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) features: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) access: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) + hwid: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) disabled_when_limited: Mapped[bool] = mapped_column(default=False, server_default="0") disable_users_when_limited: Mapped[bool] = mapped_column(default=True, server_default="1") created_at: Mapped[dt] = mapped_column(DateTime(timezone=True), default_factory=lambda: dt.now(tz.utc), init=False) diff --git a/app/models/admin.py b/app/models/admin.py index 4c90f46f..19aba53b 100644 --- a/app/models/admin.py +++ b/app/models/admin.py @@ -9,7 +9,7 @@ from pydantic import BaseModel, ConfigDict, Field, computed_field, field_validator from app.db.models import AdminStatus -from app.models.admin_role import RoleAccess, RoleFeatures, RoleLimits, RolePermissions +from app.models.admin_role import RoleAccess, RoleFeatures, RoleHWIDSettings, RoleLimits, RolePermissions from app.models.stats import Period from app.utils.helpers import fix_datetime_timezone @@ -58,6 +58,7 @@ class AdminRoleData(BaseModel): limits: RoleLimits = Field(default_factory=RoleLimits) features: RoleFeatures = Field(default_factory=RoleFeatures) access: RoleAccess = Field(default_factory=RoleAccess) + hwid: RoleHWIDSettings = Field(default_factory=RoleHWIDSettings) disabled_when_limited: bool = False disable_users_when_limited: bool = True diff --git a/app/models/admin_role.py b/app/models/admin_role.py index 4bef7f7a..256dca51 100644 --- a/app/models/admin_role.py +++ b/app/models/admin_role.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, ConfigDict, Field, field_validator +from app.db.models import HWIDPolicy from app.models.validators import ListValidator @@ -107,6 +108,13 @@ class RoleAccess(BaseModel): model_config = ConfigDict(from_attributes=True) +class RoleHWIDSettings(BaseModel): + enabled: HWIDPolicy = HWIDPolicy.use_panel + forced: bool = False + + model_config = ConfigDict(from_attributes=True) + + class RolePermissions(BaseModel): """ Typed permission map. Missing resource or action = denied. @@ -139,6 +147,7 @@ class AdminRoleBase(BaseModel): limits: RoleLimits = Field(default_factory=RoleLimits) features: RoleFeatures = Field(default_factory=RoleFeatures) access: RoleAccess = Field(default_factory=RoleAccess) + hwid: RoleHWIDSettings = Field(default_factory=RoleHWIDSettings) disabled_when_limited: bool = False disable_users_when_limited: bool = True @@ -155,6 +164,7 @@ class AdminRoleModify(BaseModel): limits: RoleLimits | None = None features: RoleFeatures | None = None access: RoleAccess | None = None + hwid: RoleHWIDSettings | None = None disabled_when_limited: bool | None = None disable_users_when_limited: bool | None = None diff --git a/app/operation/subscription.py b/app/operation/subscription.py index 7f6f978e..e802b5ea 100644 --- a/app/operation/subscription.py +++ b/app/operation/subscription.py @@ -12,8 +12,9 @@ register_user_hwid, ) from app.db.crud.user import get_user_usages, user_sub_update -from app.db.models import User +from app.db.models import HWIDPolicy, User from app.models.admin import AdminDetails +from app.models.admin_role import RoleHWIDSettings from app.models.settings import Application, ConfigFormat, HWIDSettings, SubRule, Subscription as SubSettings from app.models.stats import UserUsageStatsList from app.models.subscription import SubscriptionUsageQuery @@ -281,17 +282,36 @@ async def validate_and_register_hwid( db: AsyncSession, user_id: int, user_hwid_limit: int | None, + role_hwid_settings: RoleHWIDSettings | dict | None, x_hwid: str | None, x_device_os: str | None, x_ver_os: str | None, x_device_model: str | None, ): hwid_conf: HWIDSettings = await hwid_settings() - if not hwid_conf.enabled: + + if isinstance(role_hwid_settings, dict): + try: + role_hwid_settings = RoleHWIDSettings.model_validate(role_hwid_settings) + except Exception: + role_hwid_settings = None + + role_enabled = role_hwid_settings.enabled if role_hwid_settings else HWIDPolicy.use_panel + role_forced = role_hwid_settings.forced if role_hwid_settings else None + + effective_enabled = hwid_conf.enabled + if role_enabled == HWIDPolicy.on: + effective_enabled = True + elif role_enabled == HWIDPolicy.off: + effective_enabled = False + + if not effective_enabled: return + effective_forced = hwid_conf.forced if role_forced is None else role_forced + if not x_hwid: - if hwid_conf.forced: + if effective_forced: await self.raise_error(message="HWID header required", code=403) return @@ -360,7 +380,14 @@ async def user_subscription( ) else: await self.validate_and_register_hwid( - db, db_user.id, db_user.hwid_limit, x_hwid, x_device_os, x_ver_os, x_device_model + db, + db_user.id, + db_user.hwid_limit, + db_user.admin.role.hwid if db_user.admin and db_user.admin.role else None, + x_hwid, + x_device_os, + x_ver_os, + x_device_model, ) matched_rule = self.detect_client_rule(user_agent, sub_settings.rules) client_type = matched_rule.target if matched_rule else None @@ -443,7 +470,14 @@ async def user_subscription_with_client_type( user = await self.validated_user(db_user) await self.validate_and_register_hwid( - db, db_user.id, db_user.hwid_limit, x_hwid, x_device_os, x_ver_os, x_device_model + db, + db_user.id, + db_user.hwid_limit, + db_user.admin.role.hwid if db_user.admin and db_user.admin.role else None, + x_hwid, + x_device_os, + x_ver_os, + x_device_model, ) response_headers = self.create_response_headers( diff --git a/tests/api/test_admin_role.py b/tests/api/test_admin_role.py index 48854a76..17e21579 100644 --- a/tests/api/test_admin_role.py +++ b/tests/api/test_admin_role.py @@ -7,10 +7,9 @@ from app.db.models import Admin from app.models.admin import hash_password as _hash_password -from tests.api import client, TestSession +from tests.api import TestSession, client from tests.api.helpers import auth_headers, create_admin, delete_admin, unique_name - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -30,6 +29,7 @@ def _role_payload(name: str | None = None) -> dict: }, "features": {"can_use_reset_strategy": True, "can_use_next_plan": True}, "access": {"require_template": False, "allowed_template_ids": None, "allowed_group_ids": None}, + "hwid": {"enabled": "use_panel", "forced": False}, } @@ -193,9 +193,33 @@ def test_create_role(access_token): data = response.json() assert data["name"] == name assert data["is_owner"] is False + assert data["hwid"] == {"enabled": "use_panel", "forced": False} _delete_role(access_token, data["id"]) +def test_create_and_modify_role_hwid_policy(access_token): + """Owner can set and update per-role HWID policy.""" + payload = _role_payload() + payload["hwid"] = {"enabled": "off", "forced": True} + + response = client.post("/api/admin-role", headers=auth_headers(access_token), json=payload) + assert response.status_code == status.HTTP_201_CREATED + role = response.json() + + try: + assert role["hwid"] == {"enabled": "off", "forced": True} + + update_response = client.put( + f"/api/admin-role/{role['id']}", + headers=auth_headers(access_token), + json={"hwid": {"enabled": "on", "forced": False}}, + ) + assert update_response.status_code == status.HTTP_200_OK + assert update_response.json()["hwid"] == {"enabled": "on", "forced": False} + finally: + _delete_role(access_token, role["id"]) + + def test_create_role_duplicate_name_returns_409(access_token): """Creating a role with a duplicate name returns 409.""" role = _create_role(access_token) diff --git a/tests/api/test_hwid.py b/tests/api/test_hwid.py index e66e8bee..29aaae6a 100644 --- a/tests/api/test_hwid.py +++ b/tests/api/test_hwid.py @@ -4,12 +4,14 @@ from app.db.crud.hwid import register_user_hwid, reset_user_hwids from app.db.models import UserHWID -from tests.api import TestSession -from tests.api import client +from tests.api import TestSession, client from tests.api.helpers import ( auth_headers, + create_admin, create_user, + delete_admin, delete_user, + unique_name, ) @@ -120,3 +122,64 @@ def test_hwid_workflow(access_token): finally: delete_user(access_token, user["username"]) + + +def test_hwid_respects_admin_role_policy(access_token): + def _login(username: str, password: str) -> str: + response = client.post( + "/api/admin/token", + data={"username": username, "password": password, "grant_type": "password"}, + ) + assert response.status_code == status.HTTP_200_OK + return response.json()["access_token"] + + def _create_role(hwid_policy: str) -> dict: + payload = { + "name": unique_name(f"role_hwid_{hwid_policy}"), + "permissions": { + "users": {"create": True, "read": {"scope": 2}, "delete": {"scope": 2}}, + }, + "limits": {}, + "features": {}, + "access": {}, + "hwid": {"enabled": hwid_policy, "forced": False}, + } + response = client.post("/api/admin-role", headers=auth_headers(access_token), json=payload) + assert response.status_code == status.HTTP_201_CREATED + return response.json() + + role_off = _create_role("off") + role_on = _create_role("on") + admin_off = create_admin(access_token, role_id=role_off["id"]) + admin_on = create_admin(access_token, role_id=role_on["id"]) + user_off = None + user_on = None + + try: + off_token = _login(admin_off["username"], admin_off["password"]) + on_token = _login(admin_on["username"], admin_on["password"]) + + user_off = create_user(off_token) + user_on = create_user(on_token) + + off_sub_response = client.get(user_off["subscription_url"], headers={"X-HWID": "off-device"}) + assert off_sub_response.status_code == status.HTTP_200_OK + + on_sub_response = client.get(user_on["subscription_url"], headers={"X-HWID": "on-device"}) + assert on_sub_response.status_code == status.HTTP_200_OK + + off_hwids = client.get(f"/api/user/{user_off['id']}/hwids", headers=auth_headers(access_token)).json() + on_hwids = client.get(f"/api/user/{user_on['id']}/hwids", headers=auth_headers(access_token)).json() + + assert off_hwids["count"] == 0 + assert on_hwids["count"] == 1 + assert on_hwids["hwids"][0]["hwid"] == "on-device" + finally: + if user_off is not None: + delete_user(access_token, user_off["username"]) + if user_on is not None: + delete_user(access_token, user_on["username"]) + delete_admin(access_token, admin_off["username"]) + delete_admin(access_token, admin_on["username"]) + client.delete(f"/api/admin-role/{role_off['id']}", headers=auth_headers(access_token)) + client.delete(f"/api/admin-role/{role_on['id']}", headers=auth_headers(access_token)) From 7a4b01bb9b361744d1a880dbfb7e191cb624ffb1 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Fri, 22 May 2026 16:40:59 +0330 Subject: [PATCH 02/10] feat(hwid): update HWID policy structure and defaults in admin roles, models, and tests --- app/db/crud/admin_role.py | 2 +- ...9d34a1f0_add_hwid_policy_to_admin_roles.py | 6 +- app/db/models.py | 17 ++-- app/models/admin.py | 5 +- app/models/admin_role.py | 13 +-- app/operation/subscription.py | 43 +++++----- app/operation/user.py | 79 ++++++++++++++----- tests/api/test_admin_role.py | 36 +++++++-- tests/api/test_hwid.py | 10 +-- 9 files changed, 138 insertions(+), 73 deletions(-) diff --git a/app/db/crud/admin_role.py b/app/db/crud/admin_role.py index 8d4b8e2c..df73cc7a 100644 --- a/app/db/crud/admin_role.py +++ b/app/db/crud/admin_role.py @@ -52,7 +52,7 @@ async def create_role(db: AsyncSession, data: AdminRoleCreate) -> AdminRole: limits=data.limits.model_dump(), features=data.features.model_dump(), access=data.access.model_dump(), - hwid=data.hwid.model_dump(), + hwid=data.hwid.model_dump() if data.hwid is not None else {}, disabled_when_limited=data.disabled_when_limited, disable_users_when_limited=data.disable_users_when_limited, ) diff --git a/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py b/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py index 3b707193..4883e63e 100644 --- a/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py +++ b/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py @@ -27,7 +27,11 @@ def upgrade() -> None: ) # Cross-dialect safe JSON initialization (SQLite/MySQL/PostgreSQL) - op.execute("UPDATE admin_roles SET hwid = '{\"enabled\":\"use_panel\",\"forced\":false}' WHERE hwid IS NULL") + op.execute( + "UPDATE admin_roles " + "SET hwid = '{\"enabled\":false,\"forced\":false,\"fallback_limit\":0,\"min_limit\":0,\"max_limit\":0}' " + "WHERE hwid IS NULL" + ) with op.batch_alter_table("admin_roles", schema=None) as batch_op: batch_op.alter_column("hwid", existing_type=sa.JSON(), type_=sa.JSON(), nullable=False) diff --git a/app/db/models.py b/app/db/models.py index 1bbe104b..f361ecca 100644 --- a/app/db/models.py +++ b/app/db/models.py @@ -69,12 +69,6 @@ class AdminStatus(str, Enum): limited = "limited" -class HWIDPolicy(str, Enum): - use_panel = "use_panel" - on = "on" - off = "off" - - class Admin(Base): __tablename__ = "admins" @@ -892,7 +886,16 @@ class AdminRole(Base): limits: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) features: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) access: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) - hwid: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) + hwid: Mapped[Dict] = mapped_column( + PostgresJSONB, + default_factory=lambda: { + "enabled": False, + "forced": False, + "fallback_limit": 0, + "min_limit": 0, + "max_limit": 0, + }, + ) disabled_when_limited: Mapped[bool] = mapped_column(default=False, server_default="0") disable_users_when_limited: Mapped[bool] = mapped_column(default=True, server_default="1") created_at: Mapped[dt] = mapped_column(DateTime(timezone=True), default_factory=lambda: dt.now(tz.utc), init=False) diff --git a/app/models/admin.py b/app/models/admin.py index 19aba53b..956401a1 100644 --- a/app/models/admin.py +++ b/app/models/admin.py @@ -9,7 +9,8 @@ from pydantic import BaseModel, ConfigDict, Field, computed_field, field_validator from app.db.models import AdminStatus -from app.models.admin_role import RoleAccess, RoleFeatures, RoleHWIDSettings, RoleLimits, RolePermissions +from app.models.admin_role import RoleAccess, RoleFeatures, RoleLimits, RolePermissions +from app.models.settings import HWIDSettings from app.models.stats import Period from app.utils.helpers import fix_datetime_timezone @@ -58,7 +59,7 @@ class AdminRoleData(BaseModel): limits: RoleLimits = Field(default_factory=RoleLimits) features: RoleFeatures = Field(default_factory=RoleFeatures) access: RoleAccess = Field(default_factory=RoleAccess) - hwid: RoleHWIDSettings = Field(default_factory=RoleHWIDSettings) + hwid: HWIDSettings | None = None disabled_when_limited: bool = False disable_users_when_limited: bool = True diff --git a/app/models/admin_role.py b/app/models/admin_role.py index 256dca51..0ef486d4 100644 --- a/app/models/admin_role.py +++ b/app/models/admin_role.py @@ -4,7 +4,7 @@ from pydantic import BaseModel, ConfigDict, Field, field_validator -from app.db.models import HWIDPolicy +from app.models.settings import HWIDSettings from app.models.validators import ListValidator @@ -108,13 +108,6 @@ class RoleAccess(BaseModel): model_config = ConfigDict(from_attributes=True) -class RoleHWIDSettings(BaseModel): - enabled: HWIDPolicy = HWIDPolicy.use_panel - forced: bool = False - - model_config = ConfigDict(from_attributes=True) - - class RolePermissions(BaseModel): """ Typed permission map. Missing resource or action = denied. @@ -147,7 +140,7 @@ class AdminRoleBase(BaseModel): limits: RoleLimits = Field(default_factory=RoleLimits) features: RoleFeatures = Field(default_factory=RoleFeatures) access: RoleAccess = Field(default_factory=RoleAccess) - hwid: RoleHWIDSettings = Field(default_factory=RoleHWIDSettings) + hwid: HWIDSettings = Field(default_factory=HWIDSettings) disabled_when_limited: bool = False disable_users_when_limited: bool = True @@ -164,7 +157,7 @@ class AdminRoleModify(BaseModel): limits: RoleLimits | None = None features: RoleFeatures | None = None access: RoleAccess | None = None - hwid: RoleHWIDSettings | None = None + hwid: HWIDSettings | None = None disabled_when_limited: bool | None = None disable_users_when_limited: bool | None = None diff --git a/app/operation/subscription.py b/app/operation/subscription.py index e802b5ea..c5c34e08 100644 --- a/app/operation/subscription.py +++ b/app/operation/subscription.py @@ -12,9 +12,8 @@ register_user_hwid, ) from app.db.crud.user import get_user_usages, user_sub_update -from app.db.models import HWIDPolicy, User +from app.db.models import User from app.models.admin import AdminDetails -from app.models.admin_role import RoleHWIDSettings from app.models.settings import Application, ConfigFormat, HWIDSettings, SubRule, Subscription as SubSettings from app.models.stats import UserUsageStatsList from app.models.subscription import SubscriptionUsageQuery @@ -282,36 +281,36 @@ async def validate_and_register_hwid( db: AsyncSession, user_id: int, user_hwid_limit: int | None, - role_hwid_settings: RoleHWIDSettings | dict | None, + role_hwid_settings: HWIDSettings | dict | None, x_hwid: str | None, x_device_os: str | None, x_ver_os: str | None, x_device_model: str | None, ): - hwid_conf: HWIDSettings = await hwid_settings() + global_hwid_conf: HWIDSettings = await hwid_settings() + effective_hwid_conf = global_hwid_conf if isinstance(role_hwid_settings, dict): - try: - role_hwid_settings = RoleHWIDSettings.model_validate(role_hwid_settings) - except Exception: + if not role_hwid_settings: role_hwid_settings = None - - role_enabled = role_hwid_settings.enabled if role_hwid_settings else HWIDPolicy.use_panel - role_forced = role_hwid_settings.forced if role_hwid_settings else None - - effective_enabled = hwid_conf.enabled - if role_enabled == HWIDPolicy.on: - effective_enabled = True - elif role_enabled == HWIDPolicy.off: - effective_enabled = False - - if not effective_enabled: + else: + try: + role_hwid_settings = HWIDSettings.model_validate(role_hwid_settings) + except Exception: + role_hwid_settings = None + + # Role override applies only when role hwid exists and enabled is True. + if role_hwid_settings is not None: + if role_hwid_settings.enabled: + effective_hwid_conf = role_hwid_settings + else: + return + + if not effective_hwid_conf.enabled: return - effective_forced = hwid_conf.forced if role_forced is None else role_forced - if not x_hwid: - if effective_forced: + if effective_hwid_conf.forced: await self.raise_error(message="HWID header required", code=403) return @@ -321,7 +320,7 @@ async def validate_and_register_hwid( return # It's a new HWID, check limit - limit = user_hwid_limit if user_hwid_limit is not None else hwid_conf.fallback_limit + limit = user_hwid_limit if user_hwid_limit is not None else effective_hwid_conf.fallback_limit if limit == 0: pass # unlimited else: diff --git a/app/operation/user.py b/app/operation/user.py index 5820a5aa..8c95f0e3 100644 --- a/app/operation/user.py +++ b/app/operation/user.py @@ -55,6 +55,7 @@ from app.db.models import User, UserStatus, UserTemplate from app.models.admin import AdminDetails from app.models.proxy import ProxyTable +from app.models.settings import HWIDSettings from app.models.stats import ( Period, UserCountMetric, @@ -94,7 +95,7 @@ UserUsageQuery, WireGuardPeerIPsReallocateResponse, ) -from app.node.sync import remove_user as sync_remove_user, sync_users, sync_user +from app.node.sync import remove_user as sync_remove_user, sync_user, sync_users from app.operation import BaseOperation, OperatorType from app.operation.permissions import ( PermissionDenied, @@ -105,9 +106,9 @@ is_scope_all, ) from app.settings import hwid_settings, subscription_settings +from app.utils.helpers import fix_datetime_timezone from app.utils.jwt import create_subscription_token from app.utils.logger import get_logger -from app.utils.helpers import fix_datetime_timezone from app.utils.system import readable_duration, readable_size from app.utils.wireguard import ( build_wireguard_peer_ip_allocator, @@ -129,6 +130,27 @@ def _has_permission(admin: AdminDetails, resource: str, action: str) -> bool: return False +def _resolve_effective_hwid_settings( + global_hwid: HWIDSettings, role_hwid: HWIDSettings | dict | None +) -> HWIDSettings | None: + if isinstance(role_hwid, dict): + if not role_hwid: + role_hwid = None + else: + try: + role_hwid = HWIDSettings.model_validate(role_hwid) + except Exception: + role_hwid = None + + if role_hwid is None: + return global_hwid + + if not role_hwid.enabled: + return None + + return role_hwid + + logger = get_logger("user-operation") _USER_AGENT_SPLIT_RE = re.compile(r"[;/\s\(\)]+") @@ -142,9 +164,9 @@ def _is_non_blocking_sync_operator(operator_type: OperatorType) -> bool: @staticmethod def _format_validation_errors(error: ValidationError) -> str: - return "; ".join( - [f"{'.'.join(str(loc_part) for loc_part in err['loc'])}: {err['msg']}" for err in error.errors()] - ) + return "; ".join([ + f"{'.'.join(str(loc_part) for loc_part in err['loc'])}: {err['msg']}" for err in error.errors() + ]) @staticmethod async def generate_subscription_url(user: UserNotificationResponse): @@ -515,10 +537,14 @@ async def _enforce_user_limits( async def create_user( self, db: AsyncSession, new_user: UserCreate, admin: AdminDetails, *, skip_role_limits: bool = False ) -> UserResponse: - hwid_conf = await hwid_settings() + global_hwid_conf = await hwid_settings() + effective_hwid_conf = _resolve_effective_hwid_settings( + global_hwid_conf, + admin.role.hwid if admin.role is not None else None, + ) if new_user.hwid_limit is None: - new_user.hwid_limit = hwid_conf.fallback_limit + new_user.hwid_limit = 0 if effective_hwid_conf is None else effective_hwid_conf.fallback_limit if not skip_role_limits: await self._enforce_user_limits( @@ -534,11 +560,17 @@ async def create_user( check_max_users=True, ) - if new_user.hwid_limit is not None and not admin.is_owner: - if new_user.hwid_limit < hwid_conf.min_limit: - await self.raise_error(message=f"HWID limit cannot be less than {hwid_conf.min_limit}", code=400, db=db) - if hwid_conf.max_limit > 0 and (new_user.hwid_limit > hwid_conf.max_limit or new_user.hwid_limit == 0): - await self.raise_error(message=f"HWID limit cannot exceed {hwid_conf.max_limit}", code=400, db=db) + if new_user.hwid_limit is not None and not admin.is_owner and effective_hwid_conf is not None: + if new_user.hwid_limit < effective_hwid_conf.min_limit: + await self.raise_error( + message=f"HWID limit cannot be less than {effective_hwid_conf.min_limit}", code=400, db=db + ) + if effective_hwid_conf.max_limit > 0 and ( + new_user.hwid_limit > effective_hwid_conf.max_limit or new_user.hwid_limit == 0 + ): + await self.raise_error( + message=f"HWID limit cannot exceed {effective_hwid_conf.max_limit}", code=400, db=db + ) if new_user.next_plan is not None and new_user.next_plan.user_template_id is not None: await self.get_validated_user_template(db, new_user.next_plan.user_template_id) @@ -625,13 +657,22 @@ async def _prepare_modified_user( ) if modified_user.hwid_limit is not None and not admin.is_owner: - hwid_conf = await hwid_settings() - if modified_user.hwid_limit < hwid_conf.min_limit: - await self.raise_error(message=f"HWID limit cannot be less than {hwid_conf.min_limit}", code=400, db=db) - if hwid_conf.max_limit > 0 and ( - modified_user.hwid_limit > hwid_conf.max_limit or modified_user.hwid_limit == 0 - ): - await self.raise_error(message=f"HWID limit cannot exceed {hwid_conf.max_limit}", code=400, db=db) + global_hwid_conf = await hwid_settings() + effective_hwid_conf = _resolve_effective_hwid_settings( + global_hwid_conf, + admin.role.hwid if admin.role is not None else None, + ) + if effective_hwid_conf is not None: + if modified_user.hwid_limit < effective_hwid_conf.min_limit: + await self.raise_error( + message=f"HWID limit cannot be less than {effective_hwid_conf.min_limit}", code=400, db=db + ) + if effective_hwid_conf.max_limit > 0 and ( + modified_user.hwid_limit > effective_hwid_conf.max_limit or modified_user.hwid_limit == 0 + ): + await self.raise_error( + message=f"HWID limit cannot exceed {effective_hwid_conf.max_limit}", code=400, db=db + ) validated_groups = None if modified_user.group_ids: diff --git a/tests/api/test_admin_role.py b/tests/api/test_admin_role.py index 17e21579..31e358a9 100644 --- a/tests/api/test_admin_role.py +++ b/tests/api/test_admin_role.py @@ -29,7 +29,13 @@ def _role_payload(name: str | None = None) -> dict: }, "features": {"can_use_reset_strategy": True, "can_use_next_plan": True}, "access": {"require_template": False, "allowed_template_ids": None, "allowed_group_ids": None}, - "hwid": {"enabled": "use_panel", "forced": False}, + "hwid": { + "enabled": False, + "forced": False, + "fallback_limit": 0, + "min_limit": 0, + "max_limit": 0, + }, } @@ -193,29 +199,47 @@ def test_create_role(access_token): data = response.json() assert data["name"] == name assert data["is_owner"] is False - assert data["hwid"] == {"enabled": "use_panel", "forced": False} + assert data["hwid"] == { + "enabled": False, + "forced": False, + "fallback_limit": 0, + "min_limit": 0, + "max_limit": 0, + } _delete_role(access_token, data["id"]) def test_create_and_modify_role_hwid_policy(access_token): """Owner can set and update per-role HWID policy.""" payload = _role_payload() - payload["hwid"] = {"enabled": "off", "forced": True} + payload["hwid"] = {"enabled": False, "forced": True} response = client.post("/api/admin-role", headers=auth_headers(access_token), json=payload) assert response.status_code == status.HTTP_201_CREATED role = response.json() try: - assert role["hwid"] == {"enabled": "off", "forced": True} + assert role["hwid"] == { + "enabled": False, + "forced": True, + "fallback_limit": 0, + "min_limit": 0, + "max_limit": 0, + } update_response = client.put( f"/api/admin-role/{role['id']}", headers=auth_headers(access_token), - json={"hwid": {"enabled": "on", "forced": False}}, + json={"hwid": {"enabled": True, "forced": False}}, ) assert update_response.status_code == status.HTTP_200_OK - assert update_response.json()["hwid"] == {"enabled": "on", "forced": False} + assert update_response.json()["hwid"] == { + "enabled": True, + "forced": False, + "fallback_limit": 0, + "min_limit": 0, + "max_limit": 0, + } finally: _delete_role(access_token, role["id"]) diff --git a/tests/api/test_hwid.py b/tests/api/test_hwid.py index 29aaae6a..ff6141e5 100644 --- a/tests/api/test_hwid.py +++ b/tests/api/test_hwid.py @@ -133,23 +133,23 @@ def _login(username: str, password: str) -> str: assert response.status_code == status.HTTP_200_OK return response.json()["access_token"] - def _create_role(hwid_policy: str) -> dict: + def _create_role(enabled: bool) -> dict: payload = { - "name": unique_name(f"role_hwid_{hwid_policy}"), + "name": unique_name(f"role_hwid_{'enabled' if enabled else 'disabled'}"), "permissions": { "users": {"create": True, "read": {"scope": 2}, "delete": {"scope": 2}}, }, "limits": {}, "features": {}, "access": {}, - "hwid": {"enabled": hwid_policy, "forced": False}, + "hwid": {"enabled": enabled, "forced": False}, } response = client.post("/api/admin-role", headers=auth_headers(access_token), json=payload) assert response.status_code == status.HTTP_201_CREATED return response.json() - role_off = _create_role("off") - role_on = _create_role("on") + role_off = _create_role(False) + role_on = _create_role(True) admin_off = create_admin(access_token, role_id=role_off["id"]) admin_on = create_admin(access_token, role_id=role_on["id"]) user_off = None From 2acdcf648d05f5536d01510f1868e6e5e4da2f36 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Fri, 22 May 2026 17:11:48 +0330 Subject: [PATCH 03/10] feat(admin-role): update HWID field initialization in AdminRole and AdminRoleData models --- app/db/crud/admin_role.py | 2 +- app/db/models.py | 11 +---------- app/models/admin.py | 2 +- 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/app/db/crud/admin_role.py b/app/db/crud/admin_role.py index df73cc7a..8d4b8e2c 100644 --- a/app/db/crud/admin_role.py +++ b/app/db/crud/admin_role.py @@ -52,7 +52,7 @@ async def create_role(db: AsyncSession, data: AdminRoleCreate) -> AdminRole: limits=data.limits.model_dump(), features=data.features.model_dump(), access=data.access.model_dump(), - hwid=data.hwid.model_dump() if data.hwid is not None else {}, + hwid=data.hwid.model_dump(), disabled_when_limited=data.disabled_when_limited, disable_users_when_limited=data.disable_users_when_limited, ) diff --git a/app/db/models.py b/app/db/models.py index f361ecca..4256097c 100644 --- a/app/db/models.py +++ b/app/db/models.py @@ -886,16 +886,7 @@ class AdminRole(Base): limits: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) features: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) access: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) - hwid: Mapped[Dict] = mapped_column( - PostgresJSONB, - default_factory=lambda: { - "enabled": False, - "forced": False, - "fallback_limit": 0, - "min_limit": 0, - "max_limit": 0, - }, - ) + hwid: Mapped[Dict] = mapped_column(PostgresJSONB) disabled_when_limited: Mapped[bool] = mapped_column(default=False, server_default="0") disable_users_when_limited: Mapped[bool] = mapped_column(default=True, server_default="1") created_at: Mapped[dt] = mapped_column(DateTime(timezone=True), default_factory=lambda: dt.now(tz.utc), init=False) diff --git a/app/models/admin.py b/app/models/admin.py index 956401a1..18589fa5 100644 --- a/app/models/admin.py +++ b/app/models/admin.py @@ -59,7 +59,7 @@ class AdminRoleData(BaseModel): limits: RoleLimits = Field(default_factory=RoleLimits) features: RoleFeatures = Field(default_factory=RoleFeatures) access: RoleAccess = Field(default_factory=RoleAccess) - hwid: HWIDSettings | None = None + hwid: HWIDSettings = Field(default_factory=HWIDSettings) disabled_when_limited: bool = False disable_users_when_limited: bool = True From 3a2796243d1ea047a2c8670c7d568d72c7e60827 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Fri, 22 May 2026 17:59:39 +0330 Subject: [PATCH 04/10] fix migration --- app/db/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/db/models.py b/app/db/models.py index 4256097c..0b14eb97 100644 --- a/app/db/models.py +++ b/app/db/models.py @@ -886,7 +886,7 @@ class AdminRole(Base): limits: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) features: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) access: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) - hwid: Mapped[Dict] = mapped_column(PostgresJSONB) + hwid: Mapped[Dict] = mapped_column(PostgresJSONB, default_factory=dict) disabled_when_limited: Mapped[bool] = mapped_column(default=False, server_default="0") disable_users_when_limited: Mapped[bool] = mapped_column(default=True, server_default="1") created_at: Mapped[dt] = mapped_column(DateTime(timezone=True), default_factory=lambda: dt.now(tz.utc), init=False) From a03a5e9d2ff6610616f7b27bc8d6c467061b4e8e Mon Sep 17 00:00:00 2001 From: Mohammad Date: Fri, 22 May 2026 18:15:10 +0330 Subject: [PATCH 05/10] feat(migration): refactor HWID initialization to use SQLAlchemy update method --- ...e9d34a1f0_add_hwid_policy_to_admin_roles.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py b/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py index 4883e63e..817873d2 100644 --- a/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py +++ b/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py @@ -27,10 +27,22 @@ def upgrade() -> None: ) # Cross-dialect safe JSON initialization (SQLite/MySQL/PostgreSQL) + admin_roles = sa.table( + "admin_roles", + sa.column("hwid", sa.JSON()), + ) op.execute( - "UPDATE admin_roles " - "SET hwid = '{\"enabled\":false,\"forced\":false,\"fallback_limit\":0,\"min_limit\":0,\"max_limit\":0}' " - "WHERE hwid IS NULL" + admin_roles.update() + .where(admin_roles.c.hwid.is_(None)) + .values( + hwid={ + "enabled": False, + "forced": False, + "fallback_limit": 0, + "min_limit": 0, + "max_limit": 0, + } + ) ) with op.batch_alter_table("admin_roles", schema=None) as batch_op: From 2d773dba230374b40bd8e8a828588195f7088c8c Mon Sep 17 00:00:00 2001 From: Mohammad Date: Sat, 23 May 2026 17:45:52 +0330 Subject: [PATCH 06/10] fix tests in postgresql --- app/db/migrations/env.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/app/db/migrations/env.py b/app/db/migrations/env.py index b967dae7..ef73e4a0 100644 --- a/app/db/migrations/env.py +++ b/app/db/migrations/env.py @@ -1,7 +1,9 @@ import asyncio from logging.config import fileConfig +from sqlalchemy import JSON from sqlalchemy import BigInteger from sqlalchemy import pool +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.engine import Connection from sqlalchemy.ext.asyncio import async_engine_from_config from alembic import context @@ -50,6 +52,19 @@ def _compare_type(context, inspected_column, metadata_column, inspected_type, me if sqlite_bigint_equivalent: return False + # PostgreSQL reflection can report JSON with explicit astext_type while + # metadata often renders as bare JSON(), which is not a schema change. + # Keep JSON vs JSONB detection intact by only bypassing plain JSON pairs. + if context.dialect.name == "postgresql": + is_plain_json_pair = ( + isinstance(inspected_type, JSON) + and isinstance(metadata_type, JSON) + and not isinstance(inspected_type, JSONB) + and not isinstance(metadata_type, JSONB) + ) + if is_plain_json_pair: + return False + return None From 4e4934fcde41bcc0a494f37339d503afd78a3ebd Mon Sep 17 00:00:00 2001 From: Mohammad Date: Sat, 23 May 2026 17:52:18 +0330 Subject: [PATCH 07/10] fix --- app/db/migrations/env.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/app/db/migrations/env.py b/app/db/migrations/env.py index ef73e4a0..aea0f8a0 100644 --- a/app/db/migrations/env.py +++ b/app/db/migrations/env.py @@ -42,15 +42,13 @@ def _compare_type(context, inspected_column, metadata_column, inspected_type, me BIGINT depending on how the table was originally created, which can produce false-positive autogenerate diffs. """ - if context.dialect.name != "sqlite": - return None - - sqlite_bigint_equivalent = ( - (isinstance(inspected_type, BigInteger) and isinstance(metadata_type, SqliteCompatibleBigInteger)) - or (isinstance(inspected_type, SqliteCompatibleBigInteger) and isinstance(metadata_type, BigInteger)) - ) - if sqlite_bigint_equivalent: - return False + if context.dialect.name == "sqlite": + sqlite_bigint_equivalent = ( + (isinstance(inspected_type, BigInteger) and isinstance(metadata_type, SqliteCompatibleBigInteger)) + or (isinstance(inspected_type, SqliteCompatibleBigInteger) and isinstance(metadata_type, BigInteger)) + ) + if sqlite_bigint_equivalent: + return False # PostgreSQL reflection can report JSON with explicit astext_type while # metadata often renders as bare JSON(), which is not a schema change. From 7a79b2c232795f66e91850f7d0e43b2ab6f7877b Mon Sep 17 00:00:00 2001 From: Mohammad Date: Sat, 23 May 2026 18:27:53 +0330 Subject: [PATCH 08/10] fix --- app/operation/subscription.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/operation/subscription.py b/app/operation/subscription.py index c5c34e08..a8aa2725 100644 --- a/app/operation/subscription.py +++ b/app/operation/subscription.py @@ -320,7 +320,7 @@ async def validate_and_register_hwid( return # It's a new HWID, check limit - limit = user_hwid_limit if user_hwid_limit is not None else effective_hwid_conf.fallback_limit + limit = user_hwid_limit if user_hwid_limit else effective_hwid_conf.fallback_limit if limit == 0: pass # unlimited else: From 5e84840639dac903c482ced60b08161eec2f5a75 Mon Sep 17 00:00:00 2001 From: Mohammad Date: Sat, 23 May 2026 18:42:28 +0330 Subject: [PATCH 09/10] fix update tests --- tests/api/test_user.py | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/tests/api/test_user.py b/tests/api/test_user.py index a2083f3d..1e1c3ae9 100644 --- a/tests/api/test_user.py +++ b/tests/api/test_user.py @@ -1,23 +1,23 @@ +import asyncio import io import json +import time import zipfile from base64 import b64encode from copy import deepcopy from datetime import datetime, timedelta, timezone from hashlib import sha256 from math import ceil -import asyncio -import time -from urllib.parse import parse_qs, unquote, urlsplit from unittest.mock import AsyncMock, MagicMock +from urllib.parse import parse_qs, unquote, urlsplit from fastapi import status from sqlalchemy import func, select, update from app.db.crud.hwid import register_user_hwid from app.db.models import NodeUserUsage, User -from app.models.stats import Period, UserCountMetric, UserCountMetricStat, UserCountMetricStatsList from app.models.settings import ConfigFormat, SubRule, Subscription +from app.models.stats import Period, UserCountMetric, UserCountMetricStat, UserCountMetricStatsList from app.operation.subscription import SubscriptionOperation from app.utils import jwt as jwt_utils from app.utils.crypto import generate_wireguard_keypair, get_wireguard_public_key @@ -243,6 +243,7 @@ def test_limited_admin_cannot_create_or_modify_user_to_unlimited_data_or_expire( "expire": finite_expire, "data_limit": 1024 * 1024, "data_limit_reset_strategy": "no_reset", + "hwid_limit": 1, "status": "active", }, ) @@ -1266,13 +1267,11 @@ def test_xray_subscription_uses_host_specific_template_override(access_token): access_token, name=unique_name("xray_host_override_template"), template_type="xray_subscription", - content=json.dumps( - { - "log": {"loglevel": "warning"}, - "inbounds": [{"tag": "placeholder", "protocol": "vmess", "settings": {"clients": []}}], - "outbounds": [{"tag": "template-marker", "protocol": "freedom", "settings": {}}], - } - ), + content=json.dumps({ + "log": {"loglevel": "warning"}, + "inbounds": [{"tag": "placeholder", "protocol": "vmess", "settings": {"clients": []}}], + "outbounds": [{"tag": "template-marker", "protocol": "freedom", "settings": {}}], + }), ) host_response = client.post( @@ -1338,13 +1337,11 @@ def test_xray_subscription_template_override_isolated_per_host(access_token): access_token, name=unique_name("xray_host_isolated_template"), template_type="xray_subscription", - content=json.dumps( - { - "log": {"loglevel": "warning"}, - "inbounds": [{"tag": "placeholder", "protocol": "vmess", "settings": {"clients": []}}], - "outbounds": [{"tag": "template-marker", "protocol": "freedom", "settings": {}}], - } - ), + content=json.dumps({ + "log": {"loglevel": "warning"}, + "inbounds": [{"tag": "placeholder", "protocol": "vmess", "settings": {"clients": []}}], + "outbounds": [{"tag": "template-marker", "protocol": "freedom", "settings": {}}], + }), ) first_host_response = client.post( From 21cf1c4ab1ed4ac15bf95f99513a1706b1a06db7 Mon Sep 17 00:00:00 2001 From: M03ED <50927468+M03ED@users.noreply.github.com> Date: Sat, 23 May 2026 23:58:51 +0330 Subject: [PATCH 10/10] feat(hwid): update HWID policy settings and introduce effective settings resolver --- .gitignore | 1 + ...9d34a1f0_add_hwid_policy_to_admin_roles.py | 8 ++-- app/models/settings.py | 8 ++-- app/operation/subscription.py | 27 +++--------- app/operation/user.py | 42 +++++-------------- app/utils/hwid.py | 29 +++++++++++++ tests/api/test_admin_role.py | 12 +++--- 7 files changed, 61 insertions(+), 66 deletions(-) create mode 100644 app/utils/hwid.py diff --git a/.gitignore b/.gitignore index 89d2f3ef..8e2d5f1b 100644 --- a/.gitignore +++ b/.gitignore @@ -171,3 +171,4 @@ deadlock-analysis.md # AI .AGENT +graphify-out diff --git a/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py b/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py index 817873d2..415a3967 100644 --- a/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py +++ b/app/db/migrations/versions/2c6e9d34a1f0_add_hwid_policy_to_admin_roles.py @@ -36,11 +36,11 @@ def upgrade() -> None: .where(admin_roles.c.hwid.is_(None)) .values( hwid={ - "enabled": False, + "enabled": True, "forced": False, - "fallback_limit": 0, - "min_limit": 0, - "max_limit": 0, + "fallback_limit": None, + "min_limit": None, + "max_limit": None, } ) ) diff --git a/app/models/settings.py b/app/models/settings.py index 5302bf86..a2939f8e 100644 --- a/app/models/settings.py +++ b/app/models/settings.py @@ -288,11 +288,11 @@ def validate_recommended_apps(cls, v: list[Application]) -> list[Application]: class HWIDSettings(BaseModel): - enabled: bool = Field(default=False) + enabled: bool = Field(default=True) forced: bool = Field(default=False) - fallback_limit: int = Field(default=0, ge=0) - min_limit: int = Field(default=0, ge=0) - max_limit: int = Field(default=0, ge=0) + fallback_limit: int | None = Field(default=None, ge=0) + min_limit: int | None = Field(default=None, ge=0) + max_limit: int | None = Field(default=None, ge=0) class General(BaseModel): diff --git a/app/operation/subscription.py b/app/operation/subscription.py index a8aa2725..9f95e0bb 100644 --- a/app/operation/subscription.py +++ b/app/operation/subscription.py @@ -15,6 +15,7 @@ from app.db.models import User from app.models.admin import AdminDetails from app.models.settings import Application, ConfigFormat, HWIDSettings, SubRule, Subscription as SubSettings +from app.utils.hwid import resolve_effective_hwid_settings from app.models.stats import UserUsageStatsList from app.models.subscription import SubscriptionUsageQuery from app.models.user import SubscriptionUserResponse, UsersResponseWithInbounds @@ -281,32 +282,16 @@ async def validate_and_register_hwid( db: AsyncSession, user_id: int, user_hwid_limit: int | None, - role_hwid_settings: HWIDSettings | dict | None, + role_hwid_settings: HWIDSettings | None, x_hwid: str | None, x_device_os: str | None, x_ver_os: str | None, x_device_model: str | None, ): global_hwid_conf: HWIDSettings = await hwid_settings() - effective_hwid_conf = global_hwid_conf - - if isinstance(role_hwid_settings, dict): - if not role_hwid_settings: - role_hwid_settings = None - else: - try: - role_hwid_settings = HWIDSettings.model_validate(role_hwid_settings) - except Exception: - role_hwid_settings = None - - # Role override applies only when role hwid exists and enabled is True. - if role_hwid_settings is not None: - if role_hwid_settings.enabled: - effective_hwid_conf = role_hwid_settings - else: - return - - if not effective_hwid_conf.enabled: + effective_hwid_conf = resolve_effective_hwid_settings(global_hwid_conf, role_hwid_settings) + + if effective_hwid_conf is None or not effective_hwid_conf.enabled: return if not x_hwid: @@ -320,7 +305,7 @@ async def validate_and_register_hwid( return # It's a new HWID, check limit - limit = user_hwid_limit if user_hwid_limit else effective_hwid_conf.fallback_limit + limit = user_hwid_limit if user_hwid_limit is not None else effective_hwid_conf.fallback_limit if limit == 0: pass # unlimited else: diff --git a/app/operation/user.py b/app/operation/user.py index fb9700ce..38544de6 100644 --- a/app/operation/user.py +++ b/app/operation/user.py @@ -107,6 +107,7 @@ ) from app.settings import hwid_settings, subscription_settings from app.utils.helpers import fix_datetime_timezone +from app.utils.hwid import resolve_effective_hwid_settings from app.utils.jwt import create_subscription_token from app.utils.logger import get_logger from app.utils.system import readable_duration, readable_size @@ -130,27 +131,6 @@ def _has_permission(admin: AdminDetails, resource: str, action: str) -> bool: return False -def _resolve_effective_hwid_settings( - global_hwid: HWIDSettings, role_hwid: HWIDSettings | dict | None -) -> HWIDSettings | None: - if isinstance(role_hwid, dict): - if not role_hwid: - role_hwid = None - else: - try: - role_hwid = HWIDSettings.model_validate(role_hwid) - except Exception: - role_hwid = None - - if role_hwid is None: - return global_hwid - - if not role_hwid.enabled: - return None - - return role_hwid - - logger = get_logger("user-operation") _USER_AGENT_SPLIT_RE = re.compile(r"[;/\s\(\)]+") @@ -164,9 +144,9 @@ def _is_non_blocking_sync_operator(operator_type: OperatorType) -> bool: @staticmethod def _format_validation_errors(error: ValidationError) -> str: - return "; ".join([ - f"{'.'.join(str(loc_part) for loc_part in err['loc'])}: {err['msg']}" for err in error.errors() - ]) + return "; ".join( + [f"{'.'.join(str(loc_part) for loc_part in err['loc'])}: {err['msg']}" for err in error.errors()] + ) @staticmethod async def generate_subscription_url(user: UserNotificationResponse): @@ -538,13 +518,13 @@ async def create_user( self, db: AsyncSession, new_user: UserCreate, admin: AdminDetails, *, skip_role_limits: bool = False ) -> UserResponse: global_hwid_conf = await hwid_settings() - effective_hwid_conf = _resolve_effective_hwid_settings( + effective_hwid_conf = resolve_effective_hwid_settings( global_hwid_conf, admin.role.hwid if admin.role is not None else None, ) if new_user.hwid_limit is None: - new_user.hwid_limit = 0 if effective_hwid_conf is None else effective_hwid_conf.fallback_limit + new_user.hwid_limit = 0 if effective_hwid_conf is None else (effective_hwid_conf.fallback_limit or 0) if not skip_role_limits: await self._enforce_user_limits( @@ -561,11 +541,11 @@ async def create_user( ) if new_user.hwid_limit is not None and not admin.is_owner and effective_hwid_conf is not None: - if new_user.hwid_limit < effective_hwid_conf.min_limit: + if effective_hwid_conf.min_limit is not None and new_user.hwid_limit < effective_hwid_conf.min_limit: await self.raise_error( message=f"HWID limit cannot be less than {effective_hwid_conf.min_limit}", code=400, db=db ) - if effective_hwid_conf.max_limit > 0 and ( + if effective_hwid_conf.max_limit is not None and effective_hwid_conf.max_limit > 0 and ( new_user.hwid_limit > effective_hwid_conf.max_limit or new_user.hwid_limit == 0 ): await self.raise_error( @@ -658,16 +638,16 @@ async def _prepare_modified_user( if modified_user.hwid_limit is not None and not admin.is_owner: global_hwid_conf = await hwid_settings() - effective_hwid_conf = _resolve_effective_hwid_settings( + effective_hwid_conf = resolve_effective_hwid_settings( global_hwid_conf, admin.role.hwid if admin.role is not None else None, ) if effective_hwid_conf is not None: - if modified_user.hwid_limit < effective_hwid_conf.min_limit: + if effective_hwid_conf.min_limit is not None and modified_user.hwid_limit < effective_hwid_conf.min_limit: await self.raise_error( message=f"HWID limit cannot be less than {effective_hwid_conf.min_limit}", code=400, db=db ) - if effective_hwid_conf.max_limit > 0 and ( + if effective_hwid_conf.max_limit is not None and effective_hwid_conf.max_limit > 0 and ( modified_user.hwid_limit > effective_hwid_conf.max_limit or modified_user.hwid_limit == 0 ): await self.raise_error( diff --git a/app/utils/hwid.py b/app/utils/hwid.py new file mode 100644 index 00000000..75008eb0 --- /dev/null +++ b/app/utils/hwid.py @@ -0,0 +1,29 @@ +from app.models.settings import HWIDSettings + + +def resolve_effective_hwid_settings( + global_hwid: HWIDSettings, role_hwid: HWIDSettings | dict | None +) -> HWIDSettings | None: + if isinstance(role_hwid, dict): + if not role_hwid: + role_hwid = None + else: + try: + role_hwid = HWIDSettings.model_validate(role_hwid) + except Exception: + role_hwid = None + + if role_hwid is None: + return global_hwid + + if not role_hwid.enabled: + return None + + # enabled=True: None fields inherit from global, explicit values (including 0) override + return HWIDSettings( + enabled=True, + forced=role_hwid.forced, + fallback_limit=role_hwid.fallback_limit if role_hwid.fallback_limit is not None else global_hwid.fallback_limit, + min_limit=role_hwid.min_limit if role_hwid.min_limit is not None else global_hwid.min_limit, + max_limit=role_hwid.max_limit if role_hwid.max_limit is not None else global_hwid.max_limit, + ) diff --git a/tests/api/test_admin_role.py b/tests/api/test_admin_role.py index 31e358a9..c7154b98 100644 --- a/tests/api/test_admin_role.py +++ b/tests/api/test_admin_role.py @@ -222,9 +222,9 @@ def test_create_and_modify_role_hwid_policy(access_token): assert role["hwid"] == { "enabled": False, "forced": True, - "fallback_limit": 0, - "min_limit": 0, - "max_limit": 0, + "fallback_limit": None, + "min_limit": None, + "max_limit": None, } update_response = client.put( @@ -236,9 +236,9 @@ def test_create_and_modify_role_hwid_policy(access_token): assert update_response.json()["hwid"] == { "enabled": True, "forced": False, - "fallback_limit": 0, - "min_limit": 0, - "max_limit": 0, + "fallback_limit": None, + "min_limit": None, + "max_limit": None, } finally: _delete_role(access_token, role["id"])