diff --git a/web/pgadmin/browser/server_groups/__init__.py b/web/pgadmin/browser/server_groups/__init__.py index 50c14c6d17b..3c0c8d2563c 100644 --- a/web/pgadmin/browser/server_groups/__init__.py +++ b/web/pgadmin/browser/server_groups/__init__.py @@ -29,18 +29,14 @@ get_server_groups_for_user -def get_icon_css_class(group_id, group_user_id, - default_val='icon-server_group'): +def get_icon_css_class(is_shared_group, default_val='icon-server_group'): """ Returns css value - :param group_id: - :param group_user_id: + :param is_shared_group: :param default_val: :return: default_val """ - if (config.SERVER_MODE and - group_user_id != current_user.id and - ServerGroupModule.has_shared_server(group_id)): + if (config.SERVER_MODE and is_shared_group): default_val = 'icon-server_group_shared' return default_val, True @@ -66,39 +62,24 @@ def csssnippets(self): return snippets - @staticmethod - def has_shared_server(gid): - """ - To check whether given server group contains shared server or not - :param gid: - :return: True if servergroup contains shared server else false - """ - servers = Server.query.filter_by(servergroup_id=gid) - for s in servers: - if s.shared: - return True - return False - def get_nodes(self, *arg, **kwargs): """Return a JSON document listing the server groups for the user""" - if config.SERVER_MODE: - groups = ServerGroupView.get_all_server_groups() - else: - groups = ServerGroup.query.filter_by( - user_id=current_user.id - ).order_by("id") + groups = ServerGroupView.get_all_server_groups() - for idx, group in enumerate(groups): - icon_class, is_shared = get_icon_css_class(group.id, group.user_id) + for group in groups: + icon_class, is_shared = get_icon_css_class(group.is_shared_group) yield self.generate_browser_node( - "%d" % (group.id), None, + "%d" % (group.id), + None, group.name, icon_class, True, self.node_type, - can_delete=True if idx > 0 else False, - user_id=group.user_id, + can_delete=( + not (group.is_first_user_group or group.is_shared_group) + ), + can_edit=(not group.is_shared_group), is_shared=is_shared ) @@ -202,6 +183,7 @@ def delete(self, gid): success=0, errormsg=gettext( 'The specified server group cannot be deleted.' + ' Shared servers are present in this group.' ) ) @@ -211,6 +193,7 @@ def delete(self, gid): success=0, errormsg=gettext( 'The specified server group cannot be deleted.' + ' The first server group is not deletable.' ) ) @@ -239,10 +222,8 @@ def delete(self, gid): def update(self, gid): """Update the server-group properties""" - # There can be only one record at most - servergroup = ServerGroup.query.filter_by( - user_id=current_user.id, - id=gid).first() + # There can be only one record at most (only owned) + servergroup = get_server_group(gid, hide_shared=True) data = request.form if request.form else json.loads( request.data @@ -259,18 +240,20 @@ def update(self, gid): if 'name' in data: servergroup.name = data['name'] db.session.commit() + except exc.IntegrityError: db.session.rollback() return bad_request(gettext( "The specified server group already exists." )) + except Exception as e: db.session.rollback() return make_json_response( status=410, success=0, errormsg=str(e) ) - icon_class, is_shared = get_icon_css_class(gid, servergroup.user_id) + icon_class, is_shared = get_icon_css_class(servergroup.is_shared_group) return jsonify( node=self.blueprint.generate_browser_node( gid, @@ -279,7 +262,13 @@ def update(self, gid): icon_class, True, self.node_type, - can_delete=True, # This is user created hence can delete + can_delete=( + not ( + servergroup.is_first_user_group or + servergroup.is_shared_group + ) + ), + can_edit=(not servergroup.is_shared_group), is_shared=is_shared ) ) @@ -315,12 +304,17 @@ def create(self): user_id=current_user.id, name=data['name']) db.session.add(sg) + db.session.flush() + group_id = sg.id db.session.commit() + # Reload with access metadata attached + sg = get_server_group(group_id) + data['id'] = sg.id data['name'] = sg.name - icon_class, is_shared = get_icon_css_class(sg.id, sg.user_id) + icon_class, is_shared = get_icon_css_class(sg.is_shared_group) return jsonify( node=self.blueprint.generate_browser_node( "%d" % sg.id, @@ -329,8 +323,13 @@ def create(self): icon_class, True, self.node_type, - # This is user created hence can deleted - can_delete=True, + can_delete=( + not ( + sg.is_first_user_group or + sg.is_shared_group + ) + ), + can_edit=(not sg.is_shared_group), is_shared=is_shared ) ) @@ -385,20 +384,12 @@ def get_all_server_groups(): # Don't display shared server if user has # selected 'Hide shared server' pref = Preferences.module('browser') - hide_shared_server = pref.preference('hide_shared_server').get() - - server_groups = get_server_groups_for_user() - - if hide_shared_server: - groups = [] - for group in server_groups: - if group.user_id != current_user.id and \ - ServerGroupModule.has_shared_server(group.id): - continue - groups.append(group) - return groups + pref_item = pref.preference('hide_shared_server') + hide_shared_server = ( + pref_item.get() if pref_item is not None else False + ) - return server_groups + return get_server_groups_for_user(hide_shared_server) @pga_login_required def nodes(self, gid=None): @@ -406,14 +397,15 @@ def nodes(self, gid=None): nodes = [] if gid is None: if config.SERVER_MODE: - groups = self.get_all_server_groups() + else: - groups = ServerGroup.query.filter_by(user_id=current_user.id) + groups = get_server_groups_for_user(hide_shared=True) for group in groups: - icon_class, is_shared = get_icon_css_class(group.id, - group.user_id) + icon_class, is_shared = get_icon_css_class( + group.is_shared_group + ) nodes.append( self.blueprint.generate_browser_node( "%d" % group.id, @@ -422,6 +414,13 @@ def nodes(self, gid=None): icon_class, True, self.node_type, + can_delete=( + not ( + group.is_first_user_group or + group.is_shared_group + ) + ), + can_edit=(not group.is_shared_group), is_shared=is_shared ) ) @@ -433,14 +432,21 @@ def nodes(self, gid=None): errormsg=gettext("Could not find the server group.") ) - icon_class, is_shared = get_icon_css_class(group.id, - group.user_id) + icon_class, is_shared = get_icon_css_class(group.is_shared_group) nodes = self.blueprint.generate_browser_node( - "%d" % (group.id), None, + "%d" % (group.id), + None, group.name, icon_class, True, self.node_type, + can_delete=( + not ( + group.is_first_user_group or + group.is_shared_group + ) + ), + can_edit=(not group.is_shared_group), is_shared=is_shared ) diff --git a/web/pgadmin/browser/server_groups/static/js/server_group.js b/web/pgadmin/browser/server_groups/static/js/server_group.js index b822781cb3e..7ead306945f 100644 --- a/web/pgadmin/browser/server_groups/static/js/server_group.js +++ b/web/pgadmin/browser/server_groups/static/js/server_group.js @@ -7,12 +7,11 @@ // ////////////////////////////////////////////////////////////// import ServerGroupSchema from './server_group.ui'; -import _ from 'lodash'; define('pgadmin.node.server_group', [ 'sources/gettext', 'sources/url_for', - 'sources/pgadmin', 'pgadmin.user_management.current_user', 'pgadmin.browser', 'pgadmin.browser.node', -], function(gettext, url_for, pgAdmin, current_user) { + 'sources/pgadmin', 'pgadmin.browser', 'pgadmin.browser.node', +], function(gettext, url_for, pgAdmin) { if (!pgAdmin.Browser.Nodes['server_group']) { pgAdmin.Browser.Nodes['server_group'] = pgAdmin.Browser.Node.extend({ @@ -39,17 +38,14 @@ define('pgadmin.node.server_group', [ }]); }, getSchema: ()=>new ServerGroupSchema(), + canEdit: function(itemData) { + return (itemData && itemData.can_edit); + }, canDrop: function(itemData) { - let serverOwner = itemData.user_id; - return !(serverOwner != current_user.id && !_.isUndefined(serverOwner)); + return (itemData && itemData.can_delete); }, dropAsRemove: true, - canDelete: function(i) { - let s = pgAdmin.Browser.tree.siblings(i, true); - /* This is the only server group - we can't remove it*/ - return !(!s || s.length == 0); - }, }); } diff --git a/web/pgadmin/browser/server_groups/tests/test_sg_permissions.py b/web/pgadmin/browser/server_groups/tests/test_sg_permissions.py new file mode 100644 index 00000000000..2570e87a889 --- /dev/null +++ b/web/pgadmin/browser/server_groups/tests/test_sg_permissions.py @@ -0,0 +1,353 @@ +########################################################################## +# +# pgAdmin 4 - PostgreSQL Tools +# +# Copyright (C) 2013 - 2026, The pgAdmin Development Team +# This software is released under the PostgreSQL Licence +# +########################################################################## + +"""Tests for server group permissions after the server_access refactor.""" + +import json +import config +from pgadmin.utils.route import BaseTestGenerator +from regression.test_setup import config_data +from regression.python_test_utils.test_utils import \ + create_user_wise_test_client +from pgadmin.model import db, ServerGroup, Server + +test_user_details = None +if config.SERVER_MODE: + test_user_details = \ + config_data['pgAdmin4_test_non_admin_credentials'] + +SG_OBJ_URL = '/browser/server_group/obj/' +SG_NODES_URL = '/browser/server_group/nodes/' + + +def _create_server_group(tester, name): + """Create a server group and return (status_code, response_data).""" + response = tester.post( + SG_OBJ_URL, + data=json.dumps({'name': name}), + content_type='html/json' + ) + response_data = json.loads(response.data.decode('utf-8')) + if response.status_code == 200 and 'node' in response_data: + node = _find_node_by_label( + _get_nodes(tester)[1], name + ) + if node is not None: + response_data['node'] = node + return response.status_code, response_data + + +def _get_nodes(tester, sg_id=None): + """Fetch server group node(s) from the nodes endpoint.""" + url = SG_NODES_URL if sg_id is None else SG_NODES_URL + str(sg_id) + response = tester.get(url, content_type='html/json') + response_data = json.loads(response.data.decode('utf-8')) + return response.status_code, response_data.get('data') + + +def _find_node(nodes, sg_id): + """Find a node by _id in a list or return the dict if it matches.""" + sg_id = str(sg_id) + if isinstance(nodes, dict): + return nodes if str(nodes.get('_id')) == sg_id else None + for node in nodes or []: + if str(node.get('_id')) == sg_id: + return node + return None + + +def _find_node_by_label(nodes, label): + """Find a node by label in a list.""" + if isinstance(nodes, dict): + return nodes if nodes.get('label') == label else None + for node in nodes or []: + if node.get('label') == label: + return node + return None + + +def _get_first_group_id(app): + """Return the lowest-id server group owned by the current user.""" + with app.app_context(): + sg = ServerGroup.query.order_by(ServerGroup.id).first() + return sg.id if sg else None + + +def _cleanup_server_group(app, sg_id): + """Delete a server group directly from the database.""" + if sg_id is None: + return + with app.app_context(): + sg = ServerGroup.query.filter_by(id=sg_id).first() + if sg: + db.session.delete(sg) + db.session.commit() + + +def _cleanup_shared_servers_in_group(tester, app, group_id): + """Remove shared servers left in a group by earlier tests in the suite.""" + with app.app_context(): + server_ids = [ + s.id for s in Server.query.filter_by( + servergroup_id=group_id, shared=True + ).all() + ] + for server_id in server_ids: + tester.delete( + '/browser/server/obj/{0}/{1}'.format(group_id, server_id) + ) + + +def _assert_owned_group_node(self, node): + """Assert metadata for a non-shared owned server group node.""" + self.assertTrue(node.get('can_edit')) + self.assertFalse(node.get('is_shared')) + self.assertEqual(node.get('icon'), 'icon-server_group') + + +def _assert_desktop_nodes_metadata(self, nodes): + """Assert desktop-mode metadata on all listed server group nodes.""" + self.assertIsInstance(nodes, list) + for node in nodes: + self.assertFalse(node.get('is_shared')) + self.assertTrue(node.get('can_edit')) + + +class OwnedGroupCrudTestCase(BaseTestGenerator): + """An owned group is visible, editable, and deletable.""" + + scenarios = [ + ('Owned group is visible, editable, and deletable', + dict(is_positive_test=True)), + ] + + def setUp(self): + super().setUp() + self.sg_id = None + status_code, response_data = _create_server_group( + self.tester, 'owned_test_group' + ) + self.assertEqual(status_code, 200) + self.assertIn('node', response_data) + self.sg_id = response_data['node']['_id'] + self.assertTrue( + response_data['node'].get('can_delete'), + 'Owned CRUD tests require a non-first server group.' + ) + + def runTest(self): + if not self.sg_id: + raise Exception("Server group not created") + + url = SG_OBJ_URL + str(self.sg_id) + response = self.tester.get(url, content_type='html/json') + self.assertEqual(response.status_code, 200) + + status_code, node = _get_nodes(self.tester, self.sg_id) + self.assertEqual(status_code, 200) + _assert_owned_group_node(self, node) + self.assertTrue(node.get('can_delete')) + + if not config.SERVER_MODE: + status_code, nodes = _get_nodes(self.tester) + self.assertEqual(status_code, 200) + _assert_desktop_nodes_metadata(self, nodes) + + response = self.tester.put( + url, + data=json.dumps({'name': 'renamed_group'}), + content_type='html/json' + ) + self.assertEqual(response.status_code, 200) + response_data = json.loads(response.data.decode('utf-8')) + self.assertIn('node', response_data) + self.assertEqual(response_data['node']['label'], 'renamed_group') + self.assertTrue(response_data['node'].get('can_edit')) + + response = self.tester.delete(url) + self.assertEqual(response.status_code, 200) + self.sg_id = None + + def tearDown(self): + _cleanup_server_group(self.app, self.sg_id) + + +class FirstGroupNotDeletableTestCase(BaseTestGenerator): + """The user's first server group cannot be deleted.""" + + scenarios = [ + ('First owned server group is not deletable', + dict(is_positive_test=False)), + ] + + def setUp(self): + super().setUp() + if config.SERVER_MODE: + # Other tests may leave shared servers in the default group; + # delete() checks that before the first-group guard. + _cleanup_shared_servers_in_group( + self.tester, self.app, config_data['server_group'] + ) + + def runTest(self): + first_group_id = _get_first_group_id(self.app) + self.assertIsNotNone(first_group_id) + + status_code, node = _get_nodes(self.tester, first_group_id) + self.assertEqual(status_code, 200) + self.assertFalse(node.get('can_delete')) + self.assertTrue(node.get('can_edit')) + self.assertFalse(node.get('is_shared')) + + if not config.SERVER_MODE: + status_code, nodes = _get_nodes(self.tester) + self.assertEqual(status_code, 200) + _assert_desktop_nodes_metadata(self, nodes) + + url = SG_OBJ_URL + str(first_group_id) + response = self.tester.delete(url) + self.assertEqual(response.status_code, 417) + response_data = json.loads(response.data.decode('utf-8')) + self.assertIn( + 'The first server group is not deletable.', + response_data.get('errormsg', '') + ) + + status_code, response_data = _create_server_group( + self.tester, 'second_test_group' + ) + self.assertEqual(status_code, 200) + self.assertIn('node', response_data) + self.assertTrue(response_data['node'].get('can_delete')) + if not config.SERVER_MODE: + self.assertFalse(response_data['node'].get('is_shared')) + + second_group_id = response_data['node']['_id'] + _cleanup_server_group(self.app, second_group_id) + + +class SharedGroupPermissionsTestCase(BaseTestGenerator): + """A group with another user's shared server is visible but not + editable or deletable (server mode only).""" + + scenarios = [ + ('Shared server group is visible with shared flags only', + dict(is_positive_test=True)), + ] + + def setUp(self): + super().setUp() + self.sg_id = None + self.server_id = None + if not config.SERVER_MODE: + self.skipTest( + 'Shared group permission tests only apply to server mode.' + ) + + status_code, response_data = _create_server_group( + self.tester, 'shared_host_group' + ) + self.assertEqual(status_code, 200) + self.assertIn('node', response_data) + self.sg_id = response_data['node']['_id'] + + self.server['shared'] = True + url = '/browser/server/obj/{0}/'.format(self.sg_id) + response = self.tester.post( + url, + data=json.dumps(self.server), + content_type='html/json' + ) + self.assertEqual(response.status_code, 200) + response_data = json.loads(response.data.decode('utf-8')) + self.assertIn('node', response_data) + self.server_id = response_data['node']['_id'] + + @create_user_wise_test_client(test_user_details) + def runTest(self): + if not self.sg_id: + raise Exception("Server group not created") + + status_code, nodes = _get_nodes(self.tester) + self.assertEqual(status_code, 200) + node = _find_node(nodes, self.sg_id) + self.assertIsNotNone( + node, + 'Shared server group should be visible to non-owner' + ) + self.assertTrue(node.get('is_shared')) + self.assertEqual(node.get('icon'), 'icon-server_group_shared') + self.assertFalse(node.get('can_edit')) + self.assertFalse(node.get('can_delete')) + + url = SG_OBJ_URL + str(self.sg_id) + response = self.tester.get(url, content_type='html/json') + self.assertEqual(response.status_code, 200) + + response = self.tester.put( + url, + data=json.dumps({'name': 'should_not_rename'}), + content_type='html/json' + ) + self.assertEqual(response.status_code, 417) + + response = self.tester.delete(url) + self.assertEqual(response.status_code, 417) + response_data = json.loads(response.data.decode('utf-8')) + self.assertIn( + 'Shared servers are present', + response_data.get('errormsg', '') + ) + + def tearDown(self): + if self.server_id is not None: + url = '/browser/server/obj/{0}/{1}'.format( + self.sg_id, self.server_id) + self.__class__.tester.delete(url) + _cleanup_server_group(self.app, self.sg_id) + + +class DesktopGroupsMetadataTestCase(BaseTestGenerator): + """Desktop mode returns owned groups with is_shared=False.""" + + scenarios = [ + ('Desktop groups report is_shared=False', + dict(is_positive_test=True)), + ] + + def setUp(self): + super().setUp() + self.sg_id = None + if config.SERVER_MODE: + self.skipTest( + 'Desktop metadata tests only apply to desktop mode.' + ) + + def runTest(self): + status_code, nodes = _get_nodes(self.tester) + self.assertEqual(status_code, 200) + _assert_desktop_nodes_metadata(self, nodes) + + first_group_id = _get_first_group_id(self.app) + self.assertIsNotNone(first_group_id) + first_node = _find_node(nodes, first_group_id) + self.assertIsNotNone(first_node) + self.assertFalse(first_node.get('can_delete')) + + status_code, response_data = _create_server_group( + self.tester, 'desktop_second_group' + ) + self.assertEqual(status_code, 200) + self.assertIn('node', response_data) + self.assertFalse(response_data['node'].get('is_shared')) + self.assertTrue(response_data['node'].get('can_delete')) + self.sg_id = response_data['node']['_id'] + + def tearDown(self): + _cleanup_server_group(self.app, self.sg_id) diff --git a/web/pgadmin/utils/server_access.py b/web/pgadmin/utils/server_access.py index efceca83c19..5d9dc74818c 100644 --- a/web/pgadmin/utils/server_access.py +++ b/web/pgadmin/utils/server_access.py @@ -14,7 +14,7 @@ have been explicitly shared with them via SharedServer entries. """ -from sqlalchemy import or_ +from sqlalchemy import or_, case, exists, func, literal from flask_security import current_user from pgadmin.model import db, Server, ServerGroup @@ -65,58 +65,122 @@ def get_server(sid, only_owned=False): ).first() -def get_server_group(gid): +def get_server_group(gid,hide_shared=False): """Fetch a server group by ID, verifying user access. - Returns the group if: - - Desktop mode, OR - - The user owns it, OR - - It contains shared servers (Server.shared=True). + See get_server_groups_for_user() docstring for the underlying access logic. + Returns the group if the user owns it, or if it contains shared servers. + Returns None otherwise. + """ + sg = get_server_groups_for_user(servergroup_id=gid, + hide_shared=hide_shared) + + if sg: + return sg[0] + + return None + - Returns None otherwise. The Administrator role does not grant - access to other users' private groups. +def get_server_groups_for_user(hide_shared=False, servergroup_id=None): + """Return a list for server groups visible to the current user. + + Args: + hide_shared: If True, only return groups owned by the current user. + servergroup_id: If provided, filter to a specific server group ID. + + See get_server_groups_for_user_query() docstring for the underlying query + logic. """ - if not config.SERVER_MODE: - return ServerGroup.query.filter_by(id=gid).first() - return ServerGroup.query.filter( - ServerGroup.id == gid, - or_( - ServerGroup.user_id == current_user.id, - ServerGroup.id.in_( - db.session.query(Server.servergroup_id).filter( - Server.shared - ) - ) - ) - ).first() + sg = get_server_groups_for_user_query(hide_shared=hide_shared, + servergroup_id=servergroup_id) + + result_list = [] + for row in sg.all(): + group = row.ServerGroup + group.is_shared_group = row.is_shared_group + group.is_first_user_group = row.is_first_user_group + result_list.append(group) -def get_server_groups_for_user(): - """Return server groups visible to the current user. + return result_list + + +def get_server_groups_for_user_query(hide_shared=False, servergroup_id=None): + """Return a query for server groups visible to the current user. Includes groups owned by the user plus groups containing shared servers (Server.shared=True, visible to all authenticated users). + is_shared_group is an additional column indicating if the group is a group + not owned by the user and contains shared servers. + The Administrator role does not grant visibility into other users' private groups — admins see the same set as a regular user with the same ownership and sharing configuration. """ + + ServerGroupAlias = db.aliased(ServerGroup) + + min_id_subquery = ( + db.session.query(func.min(ServerGroupAlias.id)) + .filter( + ServerGroupAlias.user_id == current_user.id + ) + .correlate(ServerGroup) + .scalar_subquery() + ) + + is_first_user_group = ( + (ServerGroup.id == min_id_subquery) + .label('is_first_user_group') + ) + if not config.SERVER_MODE: - return ServerGroup.query.filter_by( - user_id=current_user.id - ).all() + query = ServerGroup.query.add_columns( + literal(0).label('is_shared_group'), + is_first_user_group + ).filter(ServerGroup.user_id == current_user.id) - return ServerGroup.query.filter( - or_( - ServerGroup.user_id == current_user.id, - ServerGroup.id.in_( - db.session.query(Server.servergroup_id).filter( - Server.shared - ) + if servergroup_id is not None: + query = query.filter(ServerGroup.id == servergroup_id) + + return query.order_by(ServerGroup.id) + + query = ServerGroup.query.add_columns( + (ServerGroup.user_id != current_user.id) + .label('is_shared_group'), + is_first_user_group + ) + + if hide_shared: + query = query.filter(ServerGroup.user_id == current_user.id) + else: + has_shared_servers = ( + db.session.query(Server.id) + .filter( + Server.servergroup_id == ServerGroup.id, + Server.shared + ) + .exists() + ) + + query = query.filter( + or_( + ServerGroup.user_id == current_user.id, + has_shared_servers ) ) - ).all() + + if servergroup_id is not None: + query = query.filter(ServerGroup.id == servergroup_id) + + query = query.order_by( + case((ServerGroup.user_id == current_user.id, 0),else_=1), + ServerGroup.id + ) + + return query def get_user_server_query():