Skip to content

Commit 21a847c

Browse files
authored
feat: Migration for legacy library permissions (openedx#134)
1 parent fe1d9bc commit 21a847c

6 files changed

Lines changed: 343 additions & 2 deletions

File tree

CHANGELOG.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ Unreleased
1616

1717
*
1818

19+
0.18.0 - 2025-11-17
20+
********************
21+
22+
Added
23+
=====
24+
25+
* Migration to transfer legacy permissions from ContentLibraryPermission to the new Casbin-based authorization model.
26+
1927
0.17.1 - 2025-11-14
2028
********************
2129

openedx_authz/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44

55
import os
66

7-
__version__ = "0.17.1"
7+
__version__ = "0.18.0"
88

99
ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))

openedx_authz/engine/utils.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88

99
from casbin import Enforcer
1010

11+
from openedx_authz.api.users import assign_role_to_user_in_scope, batch_assign_role_to_users_in_scope
12+
from openedx_authz.constants.roles import LIBRARY_ADMIN, LIBRARY_AUTHOR, LIBRARY_USER
13+
1114
logger = logging.getLogger(__name__)
1215

1316
GROUPING_POLICY_PTYPES = ["g", "g2", "g3", "g4", "g5", "g6"]
@@ -69,3 +72,82 @@ def migrate_policy_between_enforcers(
6972
except Exception as e:
7073
logger.error(f"Error loading policies from file: {e}")
7174
raise
75+
76+
77+
def migrate_legacy_permissions(ContentLibraryPermission):
78+
"""
79+
Migrate legacy permission data to the new Casbin-based authorization model.
80+
This function reads legacy permissions from the ContentLibraryPermission model
81+
and assigns equivalent roles in the new authorization system.
82+
83+
The old Library permissions are stored in the ContentLibraryPermission model, it consists of the following columns:
84+
85+
- library: FK to ContentLibrary
86+
- user: optional FK to User
87+
- group: optional FK to Group
88+
- access_level: 'admin' | 'author' | 'read'
89+
90+
In the new Authz model, this would roughly translate to:
91+
92+
- library: scope
93+
- user: subject
94+
- access_level: role
95+
96+
Now, we don't have an equivalent concept to "Group", for this we will go through the users in the group and assign
97+
roles independently.
98+
99+
param ContentLibraryPermission: The ContentLibraryPermission model to use.
100+
"""
101+
102+
legacy_permissions = ContentLibraryPermission.objects.select_related(
103+
"library", "library__org", "user", "group"
104+
).all()
105+
106+
# List to keep track of any permissions that could not be migrated
107+
permissions_with_errors = []
108+
109+
for permission in legacy_permissions:
110+
# Migrate the permission to the new model
111+
112+
# Derive equivalent role based on access level
113+
access_level_to_role = {
114+
"admin": LIBRARY_ADMIN,
115+
"author": LIBRARY_AUTHOR,
116+
"read": LIBRARY_USER,
117+
}
118+
119+
role = access_level_to_role.get(permission.access_level)
120+
if role is None:
121+
# This should not happen as there are no more access_levels defined
122+
# in ContentLibraryPermission, log and skip
123+
logger.error(f"Unknown access level: {permission.access_level} for User: {permission.user}")
124+
permissions_with_errors.append(permission)
125+
continue
126+
127+
# Generating scope based on library identifier
128+
scope = f"lib:{permission.library.org.name}:{permission.library.slug}"
129+
130+
if permission.group:
131+
# Permission applied to a group
132+
users = [user.username for user in permission.group.user_set.all()]
133+
logger.info(
134+
f"Migrating permissions for Users: {users} in Group: {permission.group.name} "
135+
f"to Role: {role.external_key} in Scope: {scope}"
136+
)
137+
batch_assign_role_to_users_in_scope(
138+
users=users, role_external_key=role.external_key, scope_external_key=scope
139+
)
140+
else:
141+
# Permission applied to individual user
142+
logger.info(
143+
f"Migrating permission for User: {permission.user.username} "
144+
f"to Role: {role.external_key} in Scope: {scope}"
145+
)
146+
147+
assign_role_to_user_in_scope(
148+
user_external_key=permission.user.username,
149+
role_external_key=role.external_key,
150+
scope_external_key=scope,
151+
)
152+
153+
return permissions_with_errors
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Generated by Django 5.2.7 on 2025-11-03 20:39
2+
3+
import logging
4+
5+
from django.db import migrations
6+
7+
from openedx_authz.engine.utils import migrate_legacy_permissions
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
def _log_migration_errors(permissions_with_errors: list) -> None:
13+
"""
14+
Log the permissions that could not be migrated during the migration process.
15+
Args:
16+
permissions_with_errors (list): List of ContentLibraryPermission instances that failed to migrate.
17+
"""
18+
logger.error(
19+
f"Migration completed with errors for {len(permissions_with_errors)} permissions.\n"
20+
"The following permissions could not be migrated:"
21+
)
22+
for permission in permissions_with_errors:
23+
logger.error(
24+
"Access level: %s, %sLibrary: %s",
25+
permission.access_level,
26+
f"User: {permission.user.username}, " if permission.user else f"Group: {permission.group.name}, ",
27+
permission.library.slug,
28+
)
29+
30+
31+
def apply_migrate_legacy_permissions(apps, schema_editor):
32+
"""
33+
Wrapper to run the migration using the historical version of the ContentLibraryPermission model.
34+
"""
35+
# ContentLibraryPermission model from the content_libraries app, here is where the legacy permissions are stored
36+
try:
37+
ContentLibraryPermission = apps.get_model("content_libraries", "ContentLibraryPermission")
38+
except LookupError:
39+
# Don't run the migration where the content_libraries app is not installed, like during development.
40+
logger.warning("ContentLibraryPermission model not found. Skipping migration.")
41+
return
42+
43+
permissions_with_errors = migrate_legacy_permissions(ContentLibraryPermission)
44+
45+
if permissions_with_errors:
46+
_log_migration_errors(permissions_with_errors)
47+
48+
49+
class Migration(migrations.Migration):
50+
"""
51+
Migration to transfer legacy permissions from ContentLibraryPermission
52+
to the new Casbin-based authorization model.
53+
"""
54+
55+
dependencies = [
56+
("openedx_authz", "0004_contentlibraryscope"),
57+
]
58+
59+
operations = [
60+
migrations.RunPython(apply_migrate_legacy_permissions),
61+
]

openedx_authz/tests/stubs/models.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,25 @@
44
referenced in FK relationships without requiring the full application context.
55
"""
66

7+
from django.conf import settings
8+
from django.contrib.auth.models import Group
79
from django.db import models
810
from opaque_keys.edx.locator import LibraryLocatorV2
911

1012

13+
class Organization(models.Model):
14+
"""Stub model representing an organization for testing purposes.
15+
16+
.. no_pii:
17+
"""
18+
19+
name = models.CharField(max_length=255)
20+
short_name = models.CharField(max_length=100)
21+
22+
def __str__(self):
23+
return str(self.name)
24+
25+
1126
class ContentLibraryManager(models.Manager):
1227
"""Manager for ContentLibrary model with helper methods."""
1328

@@ -38,9 +53,37 @@ class ContentLibrary(models.Model):
3853

3954
locator = models.CharField(max_length=255, unique=True, db_index=True)
4055
title = models.CharField(max_length=255, blank=True, null=True)
56+
slug = models.SlugField(allow_unicode=True)
57+
org = models.ForeignKey(Organization, on_delete=models.PROTECT, null=True)
4158
created_at = models.DateTimeField(auto_now_add=True)
4259

4360
objects = ContentLibraryManager()
4461

4562
def __str__(self):
46-
return self.locator
63+
return str(self.locator)
64+
65+
66+
# Legacy permission models for testing purposes
67+
class ContentLibraryPermission(models.Model):
68+
"""Stub model representing legacy content library permissions for testing purposes.
69+
70+
.. no_pii:
71+
"""
72+
73+
ADMIN_LEVEL = "admin"
74+
AUTHOR_LEVEL = "author"
75+
READ_LEVEL = "read"
76+
ACCESS_LEVEL_CHOICES = (
77+
(ADMIN_LEVEL, "Administer users and author content"),
78+
(AUTHOR_LEVEL, "Author content"),
79+
(READ_LEVEL, "Read-only"),
80+
)
81+
82+
library = models.ForeignKey(ContentLibrary, on_delete=models.CASCADE)
83+
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, null=True, blank=True)
84+
group = models.ForeignKey(Group, on_delete=models.CASCADE, null=True, blank=True)
85+
access_level = models.CharField(max_length=30, choices=ACCESS_LEVEL_CHOICES)
86+
87+
def __str__(self):
88+
who = self.user.username if self.user else self.group.name
89+
return f"ContentLibraryPermission ({self.access_level} for {who})"
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
"""Unit Tests for openedx_authz migrations."""
2+
3+
from django.contrib.auth import get_user_model
4+
from django.contrib.auth.models import Group
5+
from django.test import TestCase
6+
7+
from openedx_authz.api.users import batch_unassign_role_from_users, get_user_role_assignments_in_scope
8+
from openedx_authz.constants.roles import LIBRARY_ADMIN, LIBRARY_USER
9+
from openedx_authz.engine.enforcer import AuthzEnforcer
10+
from openedx_authz.engine.utils import migrate_legacy_permissions
11+
from openedx_authz.tests.stubs.models import ContentLibrary, ContentLibraryPermission, Organization
12+
13+
User = get_user_model()
14+
15+
# Specify a unique prefix to avoid collisions with existing data
16+
OBJECT_PREFIX = "tmlp_"
17+
18+
org_name = f"{OBJECT_PREFIX}org"
19+
lib_name = f"{OBJECT_PREFIX}library"
20+
group_name = f"{OBJECT_PREFIX}test_group"
21+
user_names = [f"{OBJECT_PREFIX}user{i}" for i in range(3)]
22+
group_user_names = [f"{OBJECT_PREFIX}guser{i}" for i in range(3)]
23+
error_user_name = f"{OBJECT_PREFIX}error_user"
24+
error_group_name = f"{OBJECT_PREFIX}error_group"
25+
empty_group_name = f"{OBJECT_PREFIX}empty_group"
26+
27+
28+
class TestLegacyPermissionsMigration(TestCase):
29+
"""Test cases for migrating legacy permissions."""
30+
31+
def setUp(self):
32+
"""
33+
Set up test data:
34+
35+
What this does:
36+
1. Creates an Org and a ContentLibrary
37+
2. Create Users and Groups
38+
3. Assign legacy permissions using ContentLibraryPermission
39+
4. Create invalid permissions for user and group
40+
"""
41+
# Create ContentLibrary
42+
43+
org = Organization.objects.create(name=org_name, short_name=org_name)
44+
library = ContentLibrary.objects.create(org=org, slug=lib_name)
45+
46+
# Create Users and Groups
47+
users = [
48+
User.objects.create_user(username=user_name, email=f"{user_name}@example.com") for user_name in user_names
49+
]
50+
51+
group_users = [
52+
User.objects.create_user(username=user_name, email=f"{user_name}@example.com")
53+
for user_name in group_user_names
54+
]
55+
group = Group.objects.create(name=group_name)
56+
group.user_set.set(group_users)
57+
58+
error_user = User.objects.create_user(username=error_user_name, email=f"{error_user_name}@example.com")
59+
error_group = Group.objects.create(name=error_group_name)
60+
error_group.user_set.set([error_user])
61+
62+
empty_group = Group.objects.create(name=empty_group_name)
63+
64+
# Assign legacy permissions for users and group
65+
for user in users:
66+
ContentLibraryPermission.objects.create(
67+
user=user,
68+
library=library,
69+
access_level=ContentLibraryPermission.ADMIN_LEVEL,
70+
)
71+
72+
ContentLibraryPermission.objects.create(
73+
group=group,
74+
library=library,
75+
access_level=ContentLibraryPermission.READ_LEVEL,
76+
)
77+
78+
# Create invalid permissions for testing error logging
79+
ContentLibraryPermission.objects.create(
80+
user=error_user,
81+
library=library,
82+
access_level="invalid",
83+
)
84+
ContentLibraryPermission.objects.create(
85+
group=error_group,
86+
library=library,
87+
access_level="invalid",
88+
)
89+
90+
# Edge case: empty group with no users
91+
ContentLibraryPermission.objects.create(
92+
group=empty_group,
93+
library=library,
94+
access_level=ContentLibraryPermission.READ_LEVEL,
95+
)
96+
97+
def tearDown(self):
98+
"""
99+
Clean up test data created for the migration test.
100+
"""
101+
super().tearDown()
102+
103+
AuthzEnforcer.get_enforcer().load_policy()
104+
batch_unassign_role_from_users(
105+
users=user_names,
106+
role_external_key=LIBRARY_ADMIN.external_key,
107+
scope_external_key=f"lib:{org_name}:{lib_name}",
108+
)
109+
batch_unassign_role_from_users(
110+
users=group_user_names,
111+
role_external_key=LIBRARY_USER.external_key,
112+
scope_external_key=f"lib:{org_name}:{lib_name}",
113+
)
114+
115+
ContentLibrary.objects.filter(slug=lib_name).delete()
116+
Organization.objects.filter(name=org_name).delete()
117+
Group.objects.filter(name=group_name).delete()
118+
Group.objects.filter(name=error_group_name).delete()
119+
Group.objects.filter(name=empty_group_name).delete()
120+
for user_name in user_names + group_user_names + [error_user_name]:
121+
User.objects.filter(username=user_name).delete()
122+
123+
def test_migration(self):
124+
"""Test the migration of legacy permissions.
125+
1. Rus the migration to migrate legacy permissions.
126+
2. Check that each user has the expected role in the new model.
127+
3. Check that the group users have the expected role in the new model.
128+
4. Check that invalid permissions were identified correctly as errors.
129+
"""
130+
131+
permissions_with_errors = migrate_legacy_permissions(ContentLibraryPermission)
132+
133+
AuthzEnforcer.get_enforcer().load_policy()
134+
for user_name in user_names:
135+
assignments = get_user_role_assignments_in_scope(
136+
user_external_key=user_name, scope_external_key=f"lib:{org_name}:{lib_name}"
137+
)
138+
self.assertEqual(len(assignments), 1)
139+
self.assertEqual(assignments[0].roles[0], LIBRARY_ADMIN)
140+
for group_user_name in group_user_names:
141+
assignments = get_user_role_assignments_in_scope(
142+
user_external_key=group_user_name, scope_external_key=f"lib:{org_name}:{lib_name}"
143+
)
144+
self.assertEqual(len(assignments), 1)
145+
self.assertEqual(assignments[0].roles[0], LIBRARY_USER)
146+
147+
self.assertEqual(len(permissions_with_errors), 2)

0 commit comments

Comments
 (0)