diff --git a/serveradmin/serverdb/query_committer.py b/serveradmin/serverdb/query_committer.py index 816e03e7..16004f23 100644 --- a/serveradmin/serverdb/query_committer.py +++ b/serveradmin/serverdb/query_committer.py @@ -418,7 +418,14 @@ def _acl_violations(touched_objects, pending_changes, acl): # Otherwise check if the ACL allows the "to be" object. to_compare = pending_changes - if not attribute_filter.matches(to_compare.get(attribute_id)): + attr_value = to_compare.get(attribute_id) + if isinstance(attr_value, (set, frozenset)): + matched = any( + attribute_filter.matches(v) for v in attr_value + ) + else: + matched = attribute_filter.matches(attr_value) + if not matched: violations.append( 'Object is not covered by ACL "{}", Attribute "{}" ' 'does not match the filter "{}".'.format( diff --git a/serveradmin/serverdb/tests/test_acls.py b/serveradmin/serverdb/tests/test_acls.py index 7e707ca9..43ff6c27 100644 --- a/serveradmin/serverdb/tests/test_acls.py +++ b/serveradmin/serverdb/tests/test_acls.py @@ -493,6 +493,82 @@ def test_deny_if_multiple_user_acls_cover_one_object_change_set(self): str(error.exception), ) + def test_permit_if_acl_filter_matches_multi_attribute(self): + # An ACL with a filter on a multi-value attribute should match when + # the object's set contains the filtered value. + user = User.objects.first() + + # First set a database value on the test object + q = Query({'object_id': 1}, ['database', 'os']) + obj = q.get() + obj['database'].add('mydb') + q.commit(user=user) + + # ACL filters on both servertype (single-value) and database (multi-value) + acl = AccessControlGroup.objects.create( + name='multi test', + query='hostname=test0 os=wheezy database=mydb', + is_whitelist=True, + ) + acl.attributes.set(Attribute.objects.filter(attribute_id='os')) + acl.members.set({user}) + acl.save() + + # Remove superuser permissions for test + user.is_superuser = False + user.save() + + unchanged_object = Query( + {'object_id': 1}, ['os', 'hostname', 'servertype', 'database'] + ).get() + unchanged_objects = {unchanged_object['object_id']: unchanged_object} + + changed_object = Query( + {'object_id': 1}, ['os', 'hostname', 'servertype', 'database'] + ).get() + changed_object['os'] = 'bookworm' + changed_objects = {changed_object['object_id']: changed_object} + + self.assertIsNone( + query_committer._access_control( + user, None, unchanged_objects, {}, changed_objects, {} + ) + ) + + def test_deny_if_acl_filter_does_not_match_multi_attribute(self): + # An ACL with a filter on a multi-value attribute should NOT match + # when the object's set does not contain the filtered value. + user = User.objects.first() + + acl = AccessControlGroup.objects.create( + name='multi test', + query='hostname=test0 os=wheezy database=mydb', + is_whitelist=True, + ) + acl.attributes.add(Attribute.objects.get(attribute_id='os')) + acl.members.add(user) + acl.save() + + # Remove superuser permissions for test + user.is_superuser = False + user.save() + + unchanged_object = Query( + {'object_id': 1}, ['os', 'hostname', 'servertype', 'database'] + ).get() + unchanged_objects = {unchanged_object['object_id']: unchanged_object} + + changed_object = Query( + {'object_id': 1}, ['os', 'hostname', 'servertype', 'database'] + ).get() + changed_object['os'] = 'bookworm' + changed_objects = {changed_object['object_id']: changed_object} + + with self.assertRaises(PermissionDenied): + query_committer._access_control( + user, None, unchanged_objects, {}, changed_objects, {} + ) + def test_hijack_objects_not_possible(self): # When an application or user is allowed to change certain attributes # it must be guaranteed that permission checks are taking place against