Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions documentation/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,21 @@ If you want to add a list of tags, you do it as follows:
The **User** class enables you to manage users, creating, deleting and updating (as for
other HDX objects) according to your permissions.

You can obtain the currently logged in user (which is based on the API token used in the
configuration):

user = User.get_current_user()

You can check that the current user has a particular permission to a specific
organization:

result = User.check_current_user_organization_access("hdx", "read")

For a general access check to use before running a script that creates or updates
datasets:

username = User.check_current_user_write_access("hdx")

You can email a user. First you need to set up an email server using a dictionary or
file:

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ rsa==4.9.1
# via google-auth
ruamel-yaml==0.18.10
# via hdx-python-utilities
setuptools==80.7.1
setuptools==80.8.0
# via ckanapi
shellingham==1.5.4
# via typer
Expand Down
2 changes: 1 addition & 1 deletion src/hdx/data/hdxobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def _write_to_hdx(
self,
action: str,
data: Dict,
id_field_name: str = None,
id_field_name: Optional[str] = None,
files_to_upload: Dict = {},
) -> Union[Dict, List]:
"""Creates or updates an HDX object in HDX and return HDX object metadata dict
Expand Down
47 changes: 46 additions & 1 deletion src/hdx/data/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import hdx.data.organization
from hdx.api.configuration import Configuration
from hdx.data.hdxobject import HDXObject
from hdx.data.hdxobject import HDXError, HDXObject
from hdx.utilities.typehint import ListTuple

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -167,6 +167,20 @@ def email(
**kwargs,
)

@staticmethod
def get_current_user(configuration: Optional[Configuration] = None) -> "User":
"""Get current user (based on authorisation from API token)

Args:
configuration (Optional[Configuration]): HDX configuration. Defaults to global configuration.

Returns:
User: Current user
"""
user = User(configuration=configuration)
user._save_to_hdx("show", {})
return user

@staticmethod
def get_all_users(
configuration: Optional[Configuration] = None, **kwargs: Any
Expand Down Expand Up @@ -373,6 +387,37 @@ def check_current_user_organization_access(
return True
return False

@classmethod
def check_current_user_write_access(
cls, organization: str, permission: str = "create_dataset"
) -> "User":
"""Check logged in user has write access to a given organization. Raises
PermissionError if teh user does not have access otherwise logs and returns the
current username.

Args:
organization (str): Organization id or name.
permission (str): Permission to check for. Defaults to 'create_dataset'.

Returns:
str: Username of current user
"""
try:
current_user = cls.get_current_user()
except HDXError:
raise PermissionError(
"There is no logged in user (missing or invalid API token)!"
)
username = current_user["name"]
if not cls.check_current_user_organization_access(organization, permission):
raise PermissionError(
f'Current user "{username}" does not have "{permission}" access to "{organization}" organization!'
)
logger.info(
f'Current user "{username}" has "{permission}" access to "{organization}" organization'
)
return username

def get_token_list(self):
"""Get API tokens for user.

Expand Down
4 changes: 2 additions & 2 deletions src/hdx/facades/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
setup_logging(log_file="errors.log")


def facade(projectmainfn: Callable[[None], None], **kwargs: Any):
def facade(projectmainfn: Callable[[], None], **kwargs: Any):
"""Facade to simplify project setup that calls project main function

Args:
projectmainfn ((None) -> None): main function of project
projectmainfn (() -> None): main function of project
**kwargs: configuration parameters to pass to HDX Configuration class

Returns:
Expand Down
2 changes: 2 additions & 0 deletions tests/hdx/api/test_ckan.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from hdx.api.locations import Locations
from hdx.data.dataset import Dataset
from hdx.data.resource import Resource
from hdx.data.user import User
from hdx.data.vocabulary import Vocabulary
from hdx.location.country import Country
from hdx.utilities.dateparse import now_utc
Expand All @@ -35,6 +36,7 @@ def configuration(self):
user_agent="test",
hdx_key=hdx_key,
)
User.check_current_user_write_access("5a63012e-6c41-420c-8c33-e84b277fdc90")
Locations._validlocations = None
Country.countriesdata(use_live=False)
Vocabulary._approved_vocabulary = None
Expand Down
71 changes: 71 additions & 0 deletions tests/hdx/data/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,25 @@ def post(url, data, headers, files, allow_redirects, auth=None):

Configuration.read().remoteckan().session = MockSession()

@pytest.fixture(scope="function")
def show_current_user(self):
class MockSession:
@staticmethod
def post(url, data, headers, files, allow_redirects, auth=None):
if "show" not in url:
return MockResponse(
404,
'{"success": false, "error": {"message": "TEST ERROR: Not show", "__type": "TEST ERROR: Not Show Error"}, "help": "http://test-data.humdata.org/api/3/action/help_show?name=user_show"}',
)
result = json.dumps(resultdict)
return MockResponse(
200,
'{"success": true, "result": %s, "help": "http://test-data.humdata.org/api/3/action/help_show?name=user_show"}'
% result,
)

Configuration.read().remoteckan().session = MockSession()

@pytest.fixture(scope="function")
def post_list(self):
class MockSession:
Expand Down Expand Up @@ -304,6 +323,42 @@ def post(url, data, headers, files, allow_redirects, auth=None):

Configuration.read().remoteckan().session = MockSession()

@pytest.fixture(scope="function")
def post_check_current_user_write_access(self):
class MockSession:
@staticmethod
def post(url, data, headers, files, allow_redirects, auth=None):
decodedata = data.decode("utf-8")
datadict = json.loads(decodedata)
if "user" in url:
if "show" in url:
result = json.dumps(resultdict)
return MockResponse(
200,
'{"success": true, "result": %s, "help": "http://test-data.humdata.org/api/3/action/help_show?name=user_show"}'
% result,
)
elif "list" in url:
return MockResponse(
200,
'{"success": true, "result": %s, "help": "http://test-data.humdata.org/api/3/action/help_show?name=organization_list"}'
% json.dumps(orglist),
)
elif "organization" in url:
if "show" in url:
result = json.dumps(orgdict)
if (
datadict["id"] == "b67e6c74-c185-4f43-b561-0e114a736f19"
or datadict["id"] == "TEST1"
):
return MockResponse(
200,
'{"success": true, "result": %s, "help": "http://test-data.humdata.org/api/3/action/help_show?name=user_show"}'
% result,
)

Configuration.read().remoteckan().session = MockSession()

@pytest.fixture(scope="function")
def post_listorgs_invalid(self):
class MockSession:
Expand Down Expand Up @@ -522,6 +577,10 @@ def test_update_json(self, configuration, static_json):
assert user["name"] == "MyUser1"
assert user["about"] == "other"

def test_get_current_user(self, configuration, show_current_user):
user = User.get_current_user()
assert user["name"] == "MyUser1"

def test_get_all_users(self, configuration, post_list, mocksmtp):
users = User.get_all_users()
assert len(users) == 2
Expand Down Expand Up @@ -636,6 +695,18 @@ def test_get_organizations_invalid_user(self, configuration, post_listorgs_inval
assert user.get_organization_dicts() == []
assert User.get_current_user_organization_dicts() == []

def test_check_current_user_write_access(
self, configuration, post_check_current_user_write_access
):
username = User.check_current_user_write_access(
"b67e6c74-c185-4f43-b561-0e114a736f19"
)
assert username == "MyUser1"
username = User.check_current_user_write_access("acled")
assert username == "MyUser1"
with pytest.raises(PermissionError):
User.check_current_user_write_access("lala")

def test_get_token_list(self, configuration, post_tokenlist):
user = User.read_from_hdx("9f3e9973-7dbe-4c65-8820-f48578e3ffea")
tokens = user.get_token_list()
Expand Down