diff --git a/cms/djangoapps/contentstore/api/tests/base.py b/cms/djangoapps/contentstore/api/tests/base.py
index a169c431e419..dd460734645c 100644
--- a/cms/djangoapps/contentstore/api/tests/base.py
+++ b/cms/djangoapps/contentstore/api/tests/base.py
@@ -18,6 +18,8 @@ class BaseCourseViewTest(SharedModuleStoreTestCase, APITestCase):
Base test class for course data views.
"""
view_name = None # The name of the view to use in reverse() call in self.get_url()
+ course_key_arg_name = 'course_id'
+ extra_request_args = {}
@classmethod
def setUpClass(cls):
@@ -86,9 +88,10 @@ def get_url(self, course_id):
"""
Helper function to create the url
"""
+ args = {
+ self.course_key_arg_name: course_id,
+ }
return reverse(
self.view_name,
- kwargs={
- 'course_id': course_id
- }
+ kwargs= args | self.extra_request_args
)
diff --git a/cms/djangoapps/contentstore/views/import_export.py b/cms/djangoapps/contentstore/views/import_export.py
index 22310faa1ae2..26d5b790cf52 100644
--- a/cms/djangoapps/contentstore/views/import_export.py
+++ b/cms/djangoapps/contentstore/views/import_export.py
@@ -11,6 +11,7 @@
import re
import shutil
from wsgiref.util import FileWrapper
+from openedx_authz.constants.permissions import COURSES_EXPORT_COURSE, COURSES_IMPORT_COURSE
from django.conf import settings
from django.contrib.auth.decorators import login_required
@@ -32,8 +33,9 @@
from user_tasks.conf import settings as user_tasks_settings
from user_tasks.models import UserTaskArtifact, UserTaskStatus
+from openedx.core.djangoapps.authz.constants import LegacyAuthoringPermission
+from openedx.core.djangoapps.authz.decorators import user_has_course_permission
from common.djangoapps.edxmako.shortcuts import render_to_response
-from common.djangoapps.student.auth import has_course_author_access
from common.djangoapps.util.json_request import JsonResponse
from common.djangoapps.util.monitoring import monitor_import_failure
from common.djangoapps.util.views import ensure_valid_course_key
@@ -87,7 +89,12 @@ def import_handler(request, course_key_string):
successful_url = reverse_course_url('course_handler', courselike_key)
context_name = 'context_course'
courselike_block = modulestore().get_course(courselike_key)
- if not has_course_author_access(request.user, courselike_key):
+ if not user_has_course_permission(
+ user=request.user,
+ authz_permission=COURSES_IMPORT_COURSE.identifier,
+ course_key=courselike_key,
+ legacy_permission=LegacyAuthoringPermission.WRITE
+ ):
raise PermissionDenied()
if 'application/json' in request.META.get('HTTP_ACCEPT', 'application/json'):
@@ -257,7 +264,12 @@ def import_status_handler(request, course_key_string, filename=None):
"""
course_key = CourseKey.from_string(course_key_string)
- if not has_course_author_access(request.user, course_key):
+ if not user_has_course_permission(
+ user=request.user,
+ authz_permission=COURSES_IMPORT_COURSE.identifier,
+ course_key=course_key,
+ legacy_permission=LegacyAuthoringPermission.WRITE
+ ):
raise PermissionDenied()
# The task status record is authoritative once it's been created
@@ -318,7 +330,12 @@ def export_handler(request, course_key_string):
a link appearing on the page once it's ready.
"""
course_key = CourseKey.from_string(course_key_string)
- if not has_course_author_access(request.user, course_key):
+ if not user_has_course_permission(
+ user=request.user,
+ authz_permission=COURSES_EXPORT_COURSE.identifier,
+ course_key=course_key,
+ legacy_permission=LegacyAuthoringPermission.WRITE
+ ):
raise PermissionDenied()
library = isinstance(course_key, LibraryLocator)
if library:
@@ -373,7 +390,12 @@ def export_status_handler(request, course_key_string):
returned.
"""
course_key = CourseKey.from_string(course_key_string)
- if not has_course_author_access(request.user, course_key):
+ if not user_has_course_permission(
+ user=request.user,
+ authz_permission=COURSES_EXPORT_COURSE.identifier,
+ course_key=course_key,
+ legacy_permission=LegacyAuthoringPermission.WRITE
+ ):
raise PermissionDenied()
# The task status record is authoritative once it's been created
@@ -435,7 +457,12 @@ def export_output_handler(request, course_key_string):
filesystem instead of an external service like S3.
"""
course_key = CourseKey.from_string(course_key_string)
- if not has_course_author_access(request.user, course_key):
+ if not user_has_course_permission(
+ user=request.user,
+ authz_permission=COURSES_EXPORT_COURSE.identifier,
+ course_key=course_key,
+ legacy_permission=LegacyAuthoringPermission.WRITE
+ ):
raise PermissionDenied()
task_status = _latest_task_status(request, course_key_string, export_output_handler)
diff --git a/cms/djangoapps/contentstore/views/tests/test_import_export.py b/cms/djangoapps/contentstore/views/tests/test_import_export.py
index 0f11338a4c59..03678dcff2a0 100644
--- a/cms/djangoapps/contentstore/views/tests/test_import_export.py
+++ b/cms/djangoapps/contentstore/views/tests/test_import_export.py
@@ -29,9 +29,13 @@
from path import Path as path
from storages.backends.s3boto3 import S3Boto3Storage
from user_tasks.models import UserTaskStatus
+from rest_framework import status
+from rest_framework.test import APIClient
+
from cms.djangoapps.contentstore import toggles
from cms.djangoapps.contentstore import errors as import_error
+from cms.djangoapps.contentstore.api.tests.base import BaseCourseViewTest
from cms.djangoapps.contentstore.storage import course_import_export_storage
from cms.djangoapps.contentstore.tests.test_libraries import LibraryTestCase
from cms.djangoapps.contentstore.tests.utils import CourseTestCase
@@ -39,8 +43,11 @@
from cms.djangoapps.models.settings.course_metadata import CourseMetadata
from common.djangoapps.student import auth
from common.djangoapps.student.roles import CourseInstructorRole, CourseStaffRole
+from common.djangoapps.student.tests.factories import UserFactory
from common.djangoapps.util import milestones_helpers
+from openedx.core.djangoapps.authz.tests.mixins import CourseAuthzTestMixin
from openedx.core.lib.extract_archive import safe_extractall
+from openedx_authz.constants.roles import COURSE_STAFF
from xmodule.contentstore.django import contentstore
from xmodule.modulestore import LIBRARY_ROOT, ModuleStoreEnum
from xmodule.modulestore.django import modulestore
@@ -229,11 +236,11 @@ def setUpClass(cls):
cls.VerifyingError = -2
cls.UpdatingError = -3
- def assertImportStatusResponse(self, response, status=None, expected_message=None):
+ def assertImportStatusResponse(self, response, expected_status=None, expected_message=None):
"""
Fail if the import response does not match with the provided status and message.
"""
- self.assertEqual(response["ImportStatus"], status)
+ self.assertEqual(response["ImportStatus"], expected_status)
if expected_message:
self.assertEqual(response['Message'], expected_message)
@@ -774,8 +781,8 @@ def test_export_async(self):
self.assertEqual(resp.status_code, 200)
resp = self.client.get(self.status_url)
result = json.loads(resp.content.decode('utf-8'))
- status = result['ExportStatus']
- self.assertEqual(status, 3)
+ res_status = result['ExportStatus']
+ self.assertEqual(res_status, 3)
self.assertIn('ExportOutput', result)
output_url = result['ExportOutput']
resp = self.client.get(output_url)
@@ -1367,3 +1374,305 @@ def test_problem_content_on_course_export_import(self, problem_data, expected_pr
)
self.assert_problem_definition(dest_course.location, expected_problem_content)
+
+
+class ImportAuthzTest(CourseAuthzTestMixin, BaseCourseViewTest):
+ """
+ Tests Course Import Course authorization using openedx-authz.
+ """
+
+ view_name = 'import_handler'
+ course_key_arg_name = 'course_key_string'
+ authz_roles_to_assign = [COURSE_STAFF.external_key]
+
+
+ def setUp(self):
+ super().setUp()
+
+ self.content_dir = path(tempfile.mkdtemp())
+ self.addCleanup(shutil.rmtree, self.content_dir)
+
+ # Create tar test files -----------------------------------------------
+ # OK course:
+ good_dir = tempfile.mkdtemp(dir=self.content_dir)
+ # test course being deeper down than top of tar file
+ embedded_dir = os.path.join(good_dir, "grandparent", "parent")
+ os.makedirs(os.path.join(embedded_dir, "course"))
+ with open(os.path.join(embedded_dir, "course.xml"), "w+") as f:
+ f.write('')
+
+ with open(os.path.join(embedded_dir, "course", "2013_Spring.xml"), "w+") as f:
+ f.write('')
+
+ self.file_to_upload = os.path.join(self.content_dir, "good.tar.gz")
+ with tarfile.open(self.file_to_upload, "w:gz") as gtar:
+ gtar.add(good_dir)
+
+ def import_file_in_course(self, client, course_key: str = None):
+ """Helper method to import provided file in the course."""
+ with open(self.file_to_upload, 'rb') as file_data:
+ args = {"name": self.file_to_upload, "course-data": [file_data]}
+ course_key = course_key or str(self.course_key)
+ url = self.get_url(course_key)
+ return client.post(url, args)
+
+ def test_authorized_user_can_access(self):
+ """User with COURSE_STAFF role can access."""
+ self.authorized_client.login(username=self.authorized_user.username, password=self.password)
+ resp = self.import_file_in_course(self.authorized_client)
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+ def test_unauthorized_user_cannot_access(self):
+ """User without role cannot access."""
+ self.unauthorized_client.login(username=self.unauthorized_user.username, password=self.password)
+ resp = self.import_file_in_course(self.unauthorized_client)
+ self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
+
+ def test_role_scoped_to_course(self):
+ """Authorization should only apply to the assigned course."""
+ other_course = self.store.create_course("OtherOrg", "OtherCourse", "Run", self.staff.id)
+ self.authorized_client.login(username=self.authorized_user.username, password=self.password)
+ resp = self.import_file_in_course(self.authorized_client, other_course.id)
+ self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
+
+ def test_staff_user_allowed_via_legacy(self):
+ """
+ Staff users should still pass through legacy fallback.
+ """
+ self.client.login(username=self.staff.username, password=self.password)
+ resp = self.import_file_in_course(self.client)
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+ def test_superuser_allowed(self):
+ """Superusers should always be allowed."""
+ superuser = UserFactory(is_superuser=True, username='superuser', password=self.password)
+
+ client = APIClient()
+ client.login(username=superuser.username, password=self.password)
+
+ resp = self.import_file_in_course(client)
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+
+class ImportStatusAuthzTest(CourseAuthzTestMixin, BaseCourseViewTest):
+ """
+ Tests Course Import Course Staus authorization using openedx-authz.
+ """
+
+ view_name = 'import_status_handler'
+ course_key_arg_name = 'course_key_string'
+ extra_request_args = {'filename': 'test.xml'}
+ authz_roles_to_assign = [COURSE_STAFF.external_key]
+
+ def test_authorized_user_can_access(self):
+ """User with COURSE_STAFF role can access."""
+ self.authorized_client.login(username=self.authorized_user.username, password=self.password)
+ resp = self.authorized_client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+ def test_unauthorized_user_cannot_access(self):
+ """User without role cannot access."""
+ self.unauthorized_client.login(username=self.unauthorized_user.username, password=self.password)
+ resp = self.unauthorized_client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
+
+ def test_role_scoped_to_course(self):
+ """Authorization should only apply to the assigned course."""
+ other_course = self.store.create_course("OtherOrg", "OtherCourse", "Run", self.staff.id)
+
+ self.authorized_client.login(username=self.authorized_user.username, password=self.password)
+ resp = self.authorized_client.get(self.get_url(other_course.id))
+ self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
+
+ def test_staff_user_allowed_via_legacy(self):
+ """
+ Staff users should still pass through legacy fallback.
+ """
+ self.client.login(username=self.staff.username, password=self.password)
+
+ resp = self.client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+ def test_superuser_allowed(self):
+ """Superusers should always be allowed."""
+ superuser = UserFactory(is_superuser=True, username='superuser', password=self.password)
+
+ client = APIClient()
+ client.login(username=superuser.username, password=self.password)
+
+ resp = client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+
+class ExportStatusAuthzTest(CourseAuthzTestMixin, BaseCourseViewTest):
+ """
+ Tests Course Export Course Status authorization using openedx-authz.
+ """
+
+ view_name = 'export_status_handler'
+ course_key_arg_name = 'course_key_string'
+ authz_roles_to_assign = [COURSE_STAFF.external_key]
+
+ def test_authorized_user_can_access(self):
+ """User with COURSE_STAFF role can access."""
+ self.authorized_client.login(username=self.authorized_user.username, password=self.password)
+ resp = self.authorized_client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+ def test_unauthorized_user_cannot_access(self):
+ """User without role cannot access."""
+ self.unauthorized_client.login(username=self.unauthorized_user.username, password=self.password)
+ resp = self.unauthorized_client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
+
+ def test_role_scoped_to_course(self):
+ """Authorization should only apply to the assigned course."""
+ other_course = self.store.create_course("OtherOrg", "OtherCourse", "Run", self.staff.id)
+
+ self.authorized_client.login(username=self.authorized_user.username, password=self.password)
+ resp = self.authorized_client.get(self.get_url(other_course.id))
+ self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
+
+ def test_staff_user_allowed_via_legacy(self):
+ """
+ Staff users should still pass through legacy fallback.
+ """
+ self.client.login(username=self.staff.username, password=self.password)
+
+ resp = self.client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+ def test_superuser_allowed(self):
+ """Superusers should always be allowed."""
+ superuser = UserFactory(is_superuser=True, username='superuser', password=self.password)
+
+ client = APIClient()
+ client.login(username=superuser.username, password=self.password)
+
+ resp = client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+
+class ExportAuthzTest(CourseAuthzTestMixin, BaseCourseViewTest):
+ """
+ Tests Course Export Course authorization using openedx-authz.
+ """
+
+ view_name = 'export_handler'
+ course_key_arg_name = 'course_key_string'
+ authz_roles_to_assign = [COURSE_STAFF.external_key]
+
+ def test_authorized_user_can_access(self):
+ """User with COURSE_STAFF role can access."""
+ self.authorized_client.login(username=self.authorized_user.username, password=self.password)
+ resp = self.authorized_client.post(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+ def test_unauthorized_user_cannot_access(self):
+ """User without role cannot access."""
+ self.unauthorized_client.login(username=self.unauthorized_user.username, password=self.password)
+ resp = self.unauthorized_client.post(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
+
+ def test_role_scoped_to_course(self):
+ """Authorization should only apply to the assigned course."""
+ other_course = self.store.create_course("OtherOrg", "OtherCourse", "Run", self.staff.id)
+
+ self.authorized_client.login(username=self.authorized_user.username, password=self.password)
+ resp = self.authorized_client.post(self.get_url(other_course.id))
+ self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
+
+ def test_staff_user_allowed_via_legacy(self):
+ """
+ Staff users should still pass through legacy fallback.
+ """
+ self.client.login(username=self.staff.username, password=self.password)
+
+ resp = self.client.post(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+ def test_superuser_allowed(self):
+ """Superusers should always be allowed."""
+ superuser = UserFactory(is_superuser=True, username='superuser', password=self.password)
+
+ client = APIClient()
+ client.login(username=superuser.username, password=self.password)
+
+ resp = client.post(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+
+class ExportOutputAuthzTest(CourseAuthzTestMixin, BaseCourseViewTest):
+ """
+ Tests Course Export Course Output authorization using openedx-authz.
+ """
+
+ view_name = 'export_output_handler'
+ course_key_arg_name = 'course_key_string'
+ authz_roles_to_assign = [COURSE_STAFF.external_key]
+
+ def _mock_artifact(self, spec=None, file_url=None):
+ """
+ Creates a Mock of the UserTaskArtifact model for testing exports handler
+ code without touching the database.
+ """
+ mock_artifact = Mock()
+ mock_artifact.file.name = 'testfile.tar.gz'
+ mock_artifact.file.storage = Mock(spec=spec)
+ mock_artifact.file.storage.url.return_value = file_url
+ return mock_artifact
+
+ def test_authorized_user_can_access(self):
+ """User with COURSE_STAFF role can access."""
+ self.authorized_client.login(username=self.authorized_user.username, password=self.password)
+ self.authorized_client.post(reverse_course_url('export_handler', self.course_key))
+ resp = self.authorized_client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+ def test_unauthorized_user_cannot_access(self):
+ """User without role cannot access."""
+ self.unauthorized_client.login(username=self.unauthorized_user.username, password=self.password)
+ resp = self.unauthorized_client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
+
+ def test_role_scoped_to_course(self):
+ """Authorization should only apply to the assigned course."""
+ other_course = self.store.create_course("OtherOrg", "OtherCourse", "Run", self.staff.id)
+ self.authorized_client.login(username=self.authorized_user.username, password=self.password)
+ resp = self.authorized_client.get(self.get_url(other_course.id))
+ self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
+
+ def test_staff_user_allowed_via_legacy(self):
+ """
+ Staff users should still pass through legacy fallback.
+ """
+ self.client.login(username=self.staff.username, password=self.password)
+ self.client.post(reverse_course_url('export_handler', self.course_key))
+ resp = self.client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+ @patch('user_tasks.models.UserTaskArtifact.objects.get')
+ @patch('cms.djangoapps.contentstore.views.import_export._latest_task_status')
+ @patch('cms.djangoapps.contentstore.views.import_export.course_import_export_storage')
+ def test_superuser_allowed(
+ self,
+ mock_get_user_task_artifact,
+ mock_latest_task_status,
+ mock_storage,
+ ):
+ """Superusers should always be allowed."""
+ mock_latest_task_status.return_value = Mock(state=UserTaskStatus.SUCCEEDED)
+ mock_get_user_task_artifact.return_value = self._mock_artifact(
+ file_url='/path/to/testfile.tar.gz',
+ )
+ mock_tarball = Mock()
+ mock_tarball.name = 'testfile.tar.gz'
+ mock_storage.open.return_value = mock_tarball
+ mock_storage.size.return_value = 0
+
+ superuser = UserFactory(is_superuser=True, username='superuser', password=self.password)
+
+ client = APIClient()
+ client.login(username=superuser.username, password=self.password)
+ resp = client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
diff --git a/openedx/core/djangoapps/authz/tests/mixins.py b/openedx/core/djangoapps/authz/tests/mixins.py
index c2dc32277b2a..90e7d99cac3e 100644
--- a/openedx/core/djangoapps/authz/tests/mixins.py
+++ b/openedx/core/djangoapps/authz/tests/mixins.py
@@ -42,8 +42,8 @@ def setUp(self):
self._seed_database_with_policies()
- self.authorized_user = UserFactory()
- self.unauthorized_user = UserFactory()
+ self.authorized_user = UserFactory(username='authorized', password='test')
+ self.unauthorized_user = UserFactory(username='unauthorized', password='test')
for role in self.authz_roles_to_assign:
assign_role_to_user_in_scope(
diff --git a/openedx/core/djangoapps/content_tagging/auth.py b/openedx/core/djangoapps/content_tagging/auth.py
index 4a1157c3bff3..75e91f2faaf0 100644
--- a/openedx/core/djangoapps/content_tagging/auth.py
+++ b/openedx/core/djangoapps/content_tagging/auth.py
@@ -1,12 +1,37 @@
"""
Functions to validate the access in content tagging actions
"""
+import logging
-
+from opaque_keys import InvalidKeyError
+from opaque_keys.edx.keys import CourseKey
+from openedx_authz import api as authz_api
+from openedx_authz.constants.permissions import COURSES_EXPORT_TAGS
from openedx_tagging import rules as oel_tagging_rules
+from openedx.core import toggles as core_toggles
+
+log = logging.getLogger(__name__)
+
def has_view_object_tags_access(user, object_id):
+ """
+ Check if the user has access to view object tags for the given object.
+ """
+ # If authz is enabled, check for the export tags authz permission
+ course_key = None
+ try:
+ course_key = CourseKey.from_string(object_id)
+ except InvalidKeyError:
+ log.warning("Invalid course key %s", object_id)
+
+ if course_key and core_toggles.enable_authz_course_authoring(course_key) and not authz_api.is_user_allowed(
+ user.username, COURSES_EXPORT_TAGS.identifier, str(course_key)
+ ):
+ return False
+
+
+ # Always check for tagging permissions
return user.has_perm(
"oel_tagging.view_objecttag",
# The obj arg expects a model, but we are passing an object
diff --git a/openedx/core/djangoapps/content_tagging/rest_api/v1/tests/test_views.py b/openedx/core/djangoapps/content_tagging/rest_api/v1/tests/test_views.py
index b40ad5d55156..952ed4fbb5ff 100644
--- a/openedx/core/djangoapps/content_tagging/rest_api/v1/tests/test_views.py
+++ b/openedx/core/djangoapps/content_tagging/rest_api/v1/tests/test_views.py
@@ -13,14 +13,17 @@
import ddt
from django.contrib.auth import get_user_model
from django.core.files.uploadedfile import SimpleUploadedFile
+from django.urls import reverse
from edx_django_utils.cache import RequestCache
from opaque_keys.edx.locator import BlockUsageLocator, CourseLocator, LibraryCollectionLocator, LibraryContainerLocator
+from openedx_authz.constants.roles import COURSE_STAFF
from openedx_tagging.models import Tag, Taxonomy
from openedx_tagging.models.system_defined import SystemDefinedTaxonomy
from openedx_tagging.rest_api.v1.serializers import TaxonomySerializer
from organizations.models import Organization
from rest_framework import status
-from rest_framework.test import APITestCase
+from rest_framework.test import APITestCase, APIClient
+
from common.djangoapps.student.auth import add_users, update_org_role
from common.djangoapps.student.roles import (
@@ -31,7 +34,10 @@
OrgLibraryUserRole,
OrgStaffRole
)
-from common.djangoapps.student.tests.factories import UserFactory
+from common.djangoapps.student.tests.factories import StaffFactory, UserFactory
+from openedx.core.djangoapps.authz.tests.mixins import CourseAuthzTestMixin
+from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
+from xmodule.modulestore.tests.factories import CourseFactory
from openedx.core.djangoapps.content_libraries.api import AccessLevel, create_library, set_library_user_permissions
from openedx.core.djangoapps.content_tagging import api as tagging_api
from openedx.core.djangoapps.content_tagging.models import TaxonomyOrg
@@ -2051,6 +2057,55 @@ def test_export_course_invalid_id(self) -> None:
response = self.client.get(url)
assert response.status_code == status.HTTP_403_FORBIDDEN
+@skip_unless_cms
+class TestContentObjectChildrenExportViewWithAuthz(CourseAuthzTestMixin, SharedModuleStoreTestCase, APITestCase):
+ """
+ Tests Tags Export in Course authorization using openedx-authz.
+ """
+
+ authz_roles_to_assign = [COURSE_STAFF.external_key]
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.password = 'test'
+ cls.course = CourseFactory.create()
+ cls.course_key = cls.course.id
+ cls.staff = StaffFactory(course_key=cls.course_key, password=cls.password)
+
+ def get_url(self, course_key):
+ return reverse('content_tagging:taxonomy-object-tag-export', kwargs={'context_id': course_key})
+
+ def test_authorized_user_can_access(self):
+ """User with COURSE_STAFF role can access."""
+ resp = self.authorized_client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+ def test_unauthorized_user_cannot_access(self):
+ """User without role cannot access."""
+ resp = self.unauthorized_client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
+
+ def test_role_scoped_to_course(self):
+ """Authorization should only apply to the assigned course."""
+ other_course = self.store.create_course("OtherOrg", "OtherCourse", "Run", self.staff.id)
+
+ resp = self.authorized_client.get(self.get_url(other_course.id))
+ self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN)
+
+ def test_staff_user_allowed_via_legacy(self):
+ """Staff users should still pass through legacy fallback."""
+ self.client.force_authenticate(user=self.staff)
+ resp = self.client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
+
+ def test_superuser_allowed(self):
+ """Superusers should always be allowed."""
+ superuser = UserFactory(is_superuser=True)
+ client = APIClient()
+ client.force_authenticate(user=superuser)
+ resp = client.get(self.get_url(self.course_key))
+ self.assertEqual(resp.status_code, status.HTTP_200_OK)
@skip_unless_cms
@ddt.ddt
diff --git a/openedx/core/djangoapps/content_tagging/rest_api/v1/urls.py b/openedx/core/djangoapps/content_tagging/rest_api/v1/urls.py
index a59bcdf2dbf6..4ed33e3ae31b 100644
--- a/openedx/core/djangoapps/content_tagging/rest_api/v1/urls.py
+++ b/openedx/core/djangoapps/content_tagging/rest_api/v1/urls.py
@@ -29,6 +29,7 @@
path(
"object_tags//export/",
views.ObjectTagExportView.as_view(),
+ name="taxonomy-object-tag-export",
),
path('', include(router.urls))
]