Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ci/apiv2/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,7 @@ def test_bulk_activate(self):
agents = [self.create_agent() for i in range(5)]
active_attributes = [True for i in range(5)]
Agent.objects.patch_many(agents, active_attributes, "isActive")

def test_acl(self):
model_obj = self.create_test_object()
self._test_acl_list(model_obj, {'permAgentRead': True})
5 changes: 4 additions & 1 deletion ci/apiv2/test_agentassignment.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from hashtopolis import AgentAssignment

from hashtopolis_agent import DummyAgent
from utils import BaseTest, do_create_dummy_agent


Expand Down Expand Up @@ -47,3 +46,7 @@ def test_agent_assign_task(self):
self.assertEqual(len(check), 1)
self.assertEqual(check[0].agentId, agent.id)
self.assertEqual(check[0].taskId, task.id)

def test_acl(self):
model_obj = self.create_test_object()
self._test_acl_list(model_obj, {'permAgentAssignmentRead': True})
7 changes: 7 additions & 0 deletions ci/apiv2/test_agentstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@ def test_cpu_utilisation(self):
objs = AgentStat.objects.filter(agentId=agent.id, statType=3)
self.assertEqual(len(objs), 1)
self.assertListEqual(objs[0].value, cpu_utilisations)

def test_acl(self):
retval = self.create_agent_with_task()
agent = retval['agent']
stats = list(AgentStat.objects.filter(agentId=agent.id))
self.assertGreater(len(stats), 0, "Expected agent stats to exist for ACL test")
self._test_acl_list(stats[0], {'permAgentStatRead': True})
56 changes: 56 additions & 0 deletions ci/apiv2/test_apitoken.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from hashtopolis import ApiToken, HashtopolisError
from utils import BaseTest


class ApiTokenTest(BaseTest):
model_class = ApiToken

def create_test_object(self, *nargs, **kwargs):
return self.create_apitoken(*nargs, **kwargs)

def test_create(self):
model_obj = self.create_test_object(delete=False)
self._test_create(model_obj)

def test_token_returned_on_create(self):
model_obj = self.create_test_object(delete=False)
# The JWT token string is only present in the POST response
self.assertTrue(hasattr(model_obj, 'token'))
self.assertIsNotNone(model_obj.token)
self.assertIsInstance(model_obj.token, str)
self.assertGreater(len(model_obj.token), 0)

def test_token_not_in_get(self):
model_obj = self.create_test_object(delete=False)
# Retrieve the object via GET and verify the token field is absent
obj = self.model_class.objects.get(pk=model_obj.id)
self.assertFalse(hasattr(obj, 'token') and obj.token is not None)
Comment thread
jessevz marked this conversation as resolved.

def test_revoke(self):
model_obj = self.create_test_object(delete=False)
self._test_patch(model_obj, 'isRevoked', True)

def test_expand_user(self):
model_obj = self.create_test_object(delete=False)
self._test_expandables(model_obj, ['user'])

def test_patch_readonly_startValid(self):
model_obj = self.create_test_object(delete=False)
model_obj.startValid = 0
with self.assertRaises(HashtopolisError) as e:
model_obj.save()
self.assertEqual(e.exception.status_code, 403)
self.assertIn('startValid', e.exception.title)

def test_patch_readonly_endValid(self):
model_obj = self.create_test_object(delete=False)
model_obj.endValid = 9999999999
with self.assertRaises(HashtopolisError) as e:
model_obj.save()
self.assertEqual(e.exception.status_code, 403)
self.assertIn('endValid', e.exception.title)

def test_acl(self):
# Admin's token should not be visible to a different user
model_obj = self.create_test_object(delete=False)
self._test_acl_list(model_obj, {'permJwtApiKeyRead': True})
4 changes: 4 additions & 0 deletions ci/apiv2/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ def test_bulk_delete(self):
files = [self.create_test_object(delete=False) for i in range(5)]
File.objects.delete_many(files)

def test_acl(self):
model_obj = self.create_test_object()
self._test_acl_list(model_obj, {'permFileRead': True})

def test_helper_rescan_global_files(self):
model_obj1 = self.create_test_object()
model_obj2 = self.create_test_object()
Expand Down
2 changes: 1 addition & 1 deletion ci/apiv2/test_hash.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from hashtopolis import Hash, HashtopolisResponseError, HashtopolisError
from hashtopolis import Hash, HashtopolisResponseError
from utils import BaseTest


Expand Down
4 changes: 4 additions & 0 deletions ci/apiv2/test_hashlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,7 @@ def test_bulk_archive(self):
def test_bulk_delete(self):
hashlists = [self.create_test_object(delete=False) for i in range(5)]
Hashlist.objects.delete_many(hashlists)

def test_acl(self):
model_obj = self.create_test_object()
self._test_acl_list(model_obj, {'permHashlistRead': True})
2 changes: 1 addition & 1 deletion ci/apiv2/test_pretask.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from hashtopolis import Pretask, HashtopolisError
from utils import BaseTest,do_create_pretask
from utils import BaseTest


class PretaskTest(BaseTest):
Expand Down
4 changes: 4 additions & 0 deletions ci/apiv2/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ def test_toggle_archive_task_supertask_type(self):
# UPDATE tasks SET isArchived = $taskState WHERE taskWrapperId = $wrapper->getId()
# This test validates that the query pattern works correctly.

def test_acl(self):
model_obj = self.create_test_object()
self._test_acl_list(model_obj, {'permTaskRead': True})

def test_toggle_archive_task_invalid_type_error(self):
"""Test that toggleArchiveTask throws an error for invalid task types"""
# Create a normal task
Expand Down
4 changes: 4 additions & 0 deletions ci/apiv2/test_taskwrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,7 @@ def test_helper_create_supertask_generic_cracker(self):
self.assertEqual(len(objs), 1, "Should only create 1 TaskWrapper")
self.assertEqual(taskwrapper, objs[0],
"Returned create_supertask object != object found by filter")

def test_acl(self):
model_obj = self.create_test_object()
self._test_acl_list(model_obj, {'permTaskWrapperRead': True})
3 changes: 3 additions & 0 deletions ci/apiv2/testfiles/apitoken/create_apitoken_001.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"scopes": ["permHashlistRead"]
}
42 changes: 42 additions & 0 deletions ci/apiv2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

import confidence

from hashtopolis import ApiToken
from hashtopolis import AccessGroup
from hashtopolis import Helper
from hashtopolis import Agent
from hashtopolis import AgentAssignment
from hashtopolis import AgentBinary
Expand Down Expand Up @@ -102,6 +104,14 @@ def do_create_agentbinary(**kwargs):
return _do_create_obj_from_file(AgentBinary, 'create_agentbinary', **kwargs)


def do_create_apitoken(extra_payload={}, **kwargs):
now = int(time.time())
extra_payload = dict(extra_payload or {})
extra_payload.setdefault('startValid', now)
extra_payload.setdefault('endValid', now + 3600)
return _do_create_obj_from_file(ApiToken, 'create_apitoken', extra_payload, **kwargs)


def do_create_accessgroup(**kwargs):
return _do_create_obj_from_file(AccessGroup, 'create_accessgroup', **kwargs)

Expand Down Expand Up @@ -202,6 +212,27 @@ def do_create_voucher():
return Voucher(voucher=f'dummy-test-{stamp}').save()


def create_restricted_user(base_test, permissions):
"""Create a non-admin user with the given permissions and no access groups, then log in as them."""
password = 'acl-test-pass-123!'
group = do_create_globalpermissiongroup(permissions=permissions)
base_test.delete_after_test(group)
user = do_create_user(global_permission_group_id=group.id)
base_test.delete_after_test(user)
Helper().set_user_password(user, password)

# New users are auto-added to the default access group (ID 1). Remove the user so
# they have no access group membership, which is required for ACL tests to be meaningful.
connector = AccessGroup.objects.get_conn()
connector.authenticate()
uri = connector._api_endpoint + '/ui/accessgroups/1/relationships/userMembers'
headers = {**connector._headers, 'Content-Type': 'application/json'}
payload = {"data": [{"type": "User", "id": user.id}]}
r = requests.delete(uri, headers=headers, data=json.dumps(payload))
assert r.status_code in [201], f"Failed to remove user from default access group: status={r.status_code} body={r.text}"

return (user.name, password)

def find_stale_test_objects():
# Order matters, for example a Task needs to be removed before Hashlist can be removed
# Note: we are not removing default database objects
Expand Down Expand Up @@ -281,6 +312,9 @@ def _create_test_object(self, model_create_func, *nargs, delete=True, **kwargs):
def create_test_object(self, *nargs, **kwargs):
raise NotImplementedError("Implement class specific create_test_object mapping function")

def create_apitoken(self, **kwargs):
return self._create_test_object(do_create_apitoken, **kwargs)

def create_accessgroup(self, **kwargs):
return self._create_test_object(do_create_accessgroup, **kwargs)

Expand Down Expand Up @@ -384,6 +418,14 @@ def _test_exception(self, func_create, *args, **kwargs):
# checks len of both old and new exceptions style, TODO: old can be removed when ervything has been refactored.
self.assertTrue(len(e.exception.exception_details) >= 1 or len(e.exception.title) >= 1)

def _test_acl_list(self, model_obj, permissions):
"""Test that a restricted user (with no access groups) cannot see the object in list results."""
auth = create_restricted_user(self, permissions)
objs = list(self.model_class.objects.filter(id=model_obj.id).authenticate(auth))
self.assertEqual(len(objs), 0, "Restricted user should not see this object in list results")
objs = list(self.model_class.objects.filter(id=model_obj.id))
self.assertGreater(len(objs), 0, "Admin user should see this object in list results")

def _test_patch(self, model_obj, attr, new_attr_value=None):
""" Generic test worker to PATCH object"""
# Create new value
Expand Down
Loading