From 8ab3502c64e2f003a17314cd13fe721f6d9ec0dc Mon Sep 17 00:00:00 2001 From: Filippo Ledda Date: Tue, 17 Dec 2024 14:42:37 +0100 Subject: [PATCH 1/2] CH-167 Async support on Django user attach middleware --- .../cloudharness_django/middleware.py | 62 +++++++++++++++---- .../cloudharness_django/services/user.py | 10 +++ 2 files changed, 60 insertions(+), 12 deletions(-) diff --git a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/middleware.py b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/middleware.py index 56f707e7..e10ecbd9 100644 --- a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/middleware.py +++ b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/middleware.py @@ -2,6 +2,8 @@ import jwt from django.contrib.auth.models import User +from asgiref.sync import iscoroutinefunction +from django.utils.decorators import sync_and_async_middleware from keycloak.exceptions import KeycloakGetError @@ -36,20 +38,56 @@ def _get_user(): return None -class BearerTokenMiddleware: - def __init__(self, get_response=None): - # One-time configuration and initialization. - self.get_response = get_response +async def _aget_user(): + bearer = get_authentication_token() + if bearer: + # found bearer token get the Django user + try: + token = bearer.split(" ")[-1] + payload = jwt.decode(token, algorithms=["RS256"], options={"verify_signature": False}, audience="web-client") + kc_id = payload["sub"] + try: + user = await User.objects.aget(member__kc_id=kc_id) + except User.DoesNotExist: + user = await get_user_service().async_kc_user(get_auth_service().get_auth_client().get_current_user()) + return user + except KeycloakGetError: + # KC user not found + return None + except InvalidToken: + return None + except Exception as e: + log.exception("User mapping error, %s", payload["email"]) + return None + + return None + + +@sync_and_async_middleware +def BearerTokenMiddleware(get_response=None): + # One-time configuration and initialization. + if iscoroutinefunction(get_response): + async def middleware(request): + if (not request.path.startswith("/static")) and getattr(getattr(request, "user", {}), "is_anonymous", True): + user = await _aget_user() + if user: + # auto login, set the user + request.user = user + request._cached_user = user - def __call__(self, request): - if getattr(getattr(request, "user", {}), "is_anonymous", True): - user = _get_user() - if user: - # auto login, set the user - request.user = user - request._cached_user = user + response = await get_response(request) + return response + else: + def middleware(request): + if (not request.path.startswith("/static")) and getattr(getattr(request, "user", {}), "is_anonymous", True): + user = _get_user() + if user: + # auto login, set the user + request.user = user + request._cached_user = user - return self.get_response(request) + return get_response(request) + return middleware class BearerTokenAuthentication: diff --git a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/user.py b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/user.py index 189e6516..b63be738 100644 --- a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/user.py +++ b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/user.py @@ -107,6 +107,16 @@ def sync_kc_user(self, kc_user, is_superuser=False, delete=False): user.save() return user + async def async_kc_user(self, kc_user, is_superuser=False, delete=False): + # sync the kc user with the django user + + user, created = await User.objects.aget_or_create(username=kc_user["username"]) + + await Member.objects.aget_or_create(user=user, kc_id=kc_user["id"]) + user = self._map_kc_user(user, kc_user, is_superuser, delete) + user.save() + return user + def sync_kc_user_groups(self, kc_user): # Sync the user usergroups and memberships user = User.objects.get(username=kc_user["email"]) From b9e3a1d892f1e6dad0b47e8de4c8e787a6a5831c Mon Sep 17 00:00:00 2001 From: Filippo Ledda Date: Tue, 17 Dec 2024 17:44:50 +0100 Subject: [PATCH 2/2] CH-167 Async support on Django related events --- .../cloudharness_django/admin.py | 24 +++++-- .../cloudharness_django/middleware.py | 34 ++------- .../cloudharness_django/services/__init__.py | 4 +- .../cloudharness_django/services/events.py | 12 ++-- .../cloudharness_django/services/user.py | 72 ++++++++----------- .../cloudharness/events/client.py | 50 ++++++------- 6 files changed, 89 insertions(+), 107 deletions(-) diff --git a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/admin.py b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/admin.py index ea3d423d..058a35dc 100644 --- a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/admin.py +++ b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/admin.py @@ -2,7 +2,7 @@ from django.contrib import admin from django.contrib.auth.admin import UserAdmin, GroupAdmin from django.contrib.auth.models import User, Group - +import asyncio from admin_extra_buttons.api import ExtraButtonsMixin, button from .models import Member from cloudharness_django.services import get_user_service @@ -17,6 +17,20 @@ class MemberAdmin(admin.StackedInline): model = Member +def run_coroutine(coroutine): + try: + loop = asyncio.get_running_loop() + except RuntimeError: # No running event loop + loop = None + + if loop and loop.is_running(): + # If the event loop is already running, create a task + return asyncio.create_task(coroutine) + else: + # If no event loop is running, run the coroutine using asyncio.run + return asyncio.run(coroutine) + + class CHUserAdmin(ExtraButtonsMixin, UserAdmin): inlines = [MemberAdmin] @@ -31,8 +45,8 @@ def has_delete_permission(self, request, obj=None): return settings.DEBUG or settings.USER_CHANGE_ENABLED @button() - def sync_keycloak(self, request): - get_user_service().sync_kc_users_groups() + async def sync_keycloak(self, request): + run_coroutine(get_user_service().sync_kc_users_groups()) self.message_user(request, 'Keycloak users & groups synced.') @@ -48,8 +62,8 @@ def has_delete_permission(self, request, obj=None): return settings.DEBUG @button() - def sync_keycloak(self, request): - get_user_service().sync_kc_users_groups() + async def sync_keycloak(self, request): + run_coroutine(get_user_service().sync_kc_users_groups()) self.message_user(request, 'Keycloak users & groups synced.') diff --git a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/middleware.py b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/middleware.py index e10ecbd9..c96a14cb 100644 --- a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/middleware.py +++ b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/middleware.py @@ -4,6 +4,7 @@ from django.contrib.auth.models import User from asgiref.sync import iscoroutinefunction from django.utils.decorators import sync_and_async_middleware +from asgiref.sync import async_to_sync, iscoroutinefunction from keycloak.exceptions import KeycloakGetError @@ -13,32 +14,7 @@ from cloudharness import log -def _get_user(): - bearer = get_authentication_token() - if bearer: - # found bearer token get the Django user - try: - token = bearer.split(" ")[-1] - payload = jwt.decode(token, algorithms=["RS256"], options={"verify_signature": False}, audience="web-client") - kc_id = payload["sub"] - try: - user = User.objects.get(member__kc_id=kc_id) - except User.DoesNotExist: - user = get_user_service().sync_kc_user(get_auth_service().get_auth_client().get_current_user()) - return user - except KeycloakGetError: - # KC user not found - return None - except InvalidToken: - return None - except Exception as e: - log.exception("User mapping error, %s", payload["email"]) - return None - - return None - - -async def _aget_user(): +async def _get_user(): bearer = get_authentication_token() if bearer: # found bearer token get the Django user @@ -49,7 +25,7 @@ async def _aget_user(): try: user = await User.objects.aget(member__kc_id=kc_id) except User.DoesNotExist: - user = await get_user_service().async_kc_user(get_auth_service().get_auth_client().get_current_user()) + user = await get_user_service().sync_kc_user(get_auth_service().get_auth_client().get_current_user()) return user except KeycloakGetError: # KC user not found @@ -69,7 +45,7 @@ def BearerTokenMiddleware(get_response=None): if iscoroutinefunction(get_response): async def middleware(request): if (not request.path.startswith("/static")) and getattr(getattr(request, "user", {}), "is_anonymous", True): - user = await _aget_user() + user = await _get_user() if user: # auto login, set the user request.user = user @@ -80,7 +56,7 @@ async def middleware(request): else: def middleware(request): if (not request.path.startswith("/static")) and getattr(getattr(request, "user", {}), "is_anonymous", True): - user = _get_user() + user = async_to_sync(_get_user)() if user: # auto login, set the user request.user = user diff --git a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/__init__.py b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/__init__.py index 330a760e..24c8c77f 100644 --- a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/__init__.py +++ b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/__init__.py @@ -14,14 +14,14 @@ def get_auth_service(): global _auth_service if not _auth_service: - raise KeycloakOIDCAuthServiceNotInitError("Auth Service not initialized") + init_services() return _auth_service def get_user_service(): global _user_service if not _user_service: - raise KeycloakOIDUserServiceNotInitError("User Service not initialized") + init_services() return _user_service diff --git a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/events.py b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/events.py index 170b41ac..9e5e4008 100644 --- a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/events.py +++ b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/events.py @@ -1,5 +1,5 @@ from cloudharness.applications import ConfigurationCallException - +import asyncio from django.conf import settings from kafka.errors import TopicAlreadyExistsError @@ -18,7 +18,7 @@ def __init__(self, kafka_group_id): self.topics_initialized = False @staticmethod - def event_handler(app, event_client, message): + async def event_handler(app, event_client, message): resource = message["resource-type"] operation = message["operation-type"] resource_path = message["resource-path"].split("/") @@ -32,20 +32,20 @@ def event_handler(app, event_client, message): if resource == "GROUP": kc_group = auth_client.get_group(resource_path[1]) - user_service.sync_kc_group(kc_group) + await user_service.sync_kc_group(kc_group) if resource == "USER": kc_user = auth_client.get_user(resource_path[1]) - user_service.sync_kc_user(kc_user, delete=operation == "DELETE") + await user_service.sync_kc_user(kc_user, delete=operation == "DELETE") if resource == "CLIENT_ROLE_MAPPING": # adding/deleting user client roles # set/user user is_superuser kc_user = auth_client.get_user(resource_path[1]) - user_service.sync_kc_user(kc_user) + await user_service.sync_kc_user(kc_user) if resource == "GROUP_MEMBERSHIP": # adding / deleting users from groups, update the user # updating the user will also update the user groups kc_user = auth_client.get_user(resource_path[1]) - user_service.sync_kc_user(kc_user) + await user_service.sync_kc_user(kc_user) except Exception as e: log.error(e) raise e diff --git a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/user.py b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/user.py index b63be738..cc046c0a 100644 --- a/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/user.py +++ b/infrastructure/common-images/cloudharness-django/libraries/cloudharness-django/cloudharness_django/services/user.py @@ -1,3 +1,4 @@ +import asyncio from django.contrib.auth.models import User, Group from cloudharness_django.models import Team, Member @@ -52,62 +53,52 @@ def update_team(self, group): self.auth_client.update_group(group.team.kc_id, group.name) return group - def add_user_to_team(self, user, team_name): + async def add_user_to_team(self, user, team_name): # add a user from the group/team - group = Group.objects.get(name=team_name) + group = Group.objects.aget(name=team_name) kc_group_id = group.team.kc_id kc_user_id = user.member.kc_id self.auth_client.group_user_add(kc_user_id, kc_group_id) - def rm_user_from_team(self, user, team_name): + async def rm_user_from_team(self, user, team_name): # delete a user from the group/team - group = Group.objects.get(name=team_name) + group = await Group.objects.aget(name=team_name) kc_group_id = group.team.kc_id kc_user_id = user.member.kc_id self.auth_client.group_user_remove(kc_user_id, kc_group_id) - def sync_kc_group(self, kc_group): + async def sync_kc_group(self, kc_group): # sync the kc group with the django group try: - team = Team.objects.get(kc_id=kc_group["id"]) - group, created = Group.objects.get_or_create(team=team) + team = await Team.objects.aget(kc_id=kc_group["id"]) + group, created = await Group.objects.aget_or_create(team=team) group.name = kc_group["name"] except Team.DoesNotExist: - group, created = Group.objects.get_or_create(name=kc_group["name"]) + group, created = await Group.objects.aget_or_create(name=kc_group["name"]) try: # check if group has a team team = group.team except Exception as e: # create the team - superusers = User.objects.filter(is_superuser=True) - if superusers and len(superusers) > 0: - team = Team.objects.create( - owner=superusers[0], # one of the superusers will be the default team owner + try: + superuser = User.objects.filter(is_superuser=True).afirst() + + team = await Team.objects.acreate( + owner=superuser, # one of the superusers will be the default team owner kc_id=kc_group["id"], group=group) - team.save() - group.save() + await team.asave() + except User.DoesNotExist as ex: + raise Exception("There is no superuser") from ex + await group.asave() - def sync_kc_groups(self, kc_groups=None): + async def sync_kc_groups(self, kc_groups=None): # sync all groups if not kc_groups: kc_groups = self.auth_client.get_groups() - for kc_group in kc_groups: - self.sync_kc_group(kc_group) - - def sync_kc_user(self, kc_user, is_superuser=False, delete=False): - # sync the kc user with the django user - - user, created = User.objects.get_or_create(username=kc_user["username"]) - - member, created = Member.objects.get_or_create(user=user) - member.kc_id = kc_user["id"] - member.save() - user = self._map_kc_user(user, kc_user, is_superuser, delete) - user.save() - return user + await asyncio.gather(self.sync_kc_group(kc_group) for kc_group in kc_groups) - async def async_kc_user(self, kc_user, is_superuser=False, delete=False): + async def sync_kc_user(self, kc_user, is_superuser=False, delete=False): # sync the kc user with the django user user, created = await User.objects.aget_or_create(username=kc_user["username"]) @@ -117,23 +108,23 @@ async def async_kc_user(self, kc_user, is_superuser=False, delete=False): user.save() return user - def sync_kc_user_groups(self, kc_user): + async def sync_kc_user_groups(self, kc_user): # Sync the user usergroups and memberships - user = User.objects.get(username=kc_user["email"]) + user = await User.objects.aget(username=kc_user["email"]) user_groups = [] for kc_group in kc_user["userGroups"]: - user_groups += [Group.objects.get(name=kc_group["name"])] - user.groups.set(user_groups) - user.save() + user_groups += [await Group.objects.aget(name=kc_group["name"])] + await user.groups.aset(user_groups) + user.asave() try: if user.member.kc_id != kc_user["id"]: user.member.kc_id = kc_user["id"] except Member.DoesNotExist: member = Member(user=user, kc_id=kc_user["id"]) - member.save() + await member.asave() - def sync_kc_users_groups(self): + async def sync_kc_users_groups(self): # cache all admin users to minimize KC rest api calls all_admin_users = self.auth_client.get_client_role_members( self.auth_service.get_client_name(), @@ -144,11 +135,10 @@ def sync_kc_users_groups(self): for kc_user in self.auth_client.get_users(): # check if user in all_admin_users is_superuser = any([admin_user for admin_user in all_admin_users if admin_user["email"] == kc_user["email"]]) - self.sync_kc_user(kc_user, is_superuser) + await self.sync_kc_user(kc_user, is_superuser) # sync the groups - self.sync_kc_groups() + await self.sync_kc_groups() # sync the user groups and memberships - for kc_user in self.auth_client.get_users(): - self.sync_kc_user_groups(kc_user) + await asyncio.gather(self.sync_kc_user_groups(kc_user) for kc_user in self.auth_client.get_users()) diff --git a/libraries/cloudharness-common/cloudharness/events/client.py b/libraries/cloudharness-common/cloudharness/events/client.py index f7c00391..eddf2579 100644 --- a/libraries/cloudharness-common/cloudharness/events/client.py +++ b/libraries/cloudharness-common/cloudharness/events/client.py @@ -4,7 +4,8 @@ import time from typing import List, Generator import logging - +import asyncio +from asgiref.sync import iscoroutinefunction from time import sleep from cloudharness import json, dumps @@ -241,14 +242,18 @@ def close(self): # for now no cleanup tasks to do pass - def _consume_task(self, app=None, group_id=None, handler=None): + async def _consume_task(self, app=None, group_id=None, handler=None): + log.info(f'Kafka consumer thread started, listening for messages in queue: {self.topic_id}') while True: try: self.consumer = self._get_consumer(group_id) for message in self.consumer: try: - handler(event_client=self, app=app, message=message.value) + if iscoroutinefunction(handler): + await handler(event_client=self, app=app, message=message.value) + else: + handler(event_client=self, app=app, message=message.value) except Exception as e: log.error(f"Error during execution of the consumer Topic {self.topic_id} --> {e}", exc_info=True) self.consumer.close() @@ -262,28 +267,25 @@ def async_consume(self, app=None, handler=None, group_id='default'): log.debug('get current object from app') app = app._get_current_object() self._consumer_thread = threading.Thread( - target=self._consume_task, - kwargs={'app': app, - 'group_id': group_id, - 'handler': handler}) + target=asyncio.run(self._consume_task(app, group_id, handler)) + ) self._consumer_thread.daemon = True self._consumer_thread.start() log.debug('thread started') - -if __name__ == "__main__": - # creat the required os env variables - os.environ['CLOUDHARNESS_EVENTS_CLIENT_ID'] = env.get_cloudharness_events_client_id() - os.environ['CLOUDHARNESS_EVENTS_SERVICE'] = env.get_cloudharness_events_service() - - # instantiate the client - client = EventClient('test-sync-op-results-qcwbc') - - # create a topic from env variables - # print(client.create_topic()) - # publish to the prev created topic - # print(client.produce({"message": "In God we trust, all others bring data..."})) - # read from the topic - print(client.consume_all('my-group')) - # delete the topic - # print(client.delete_topic()) + if __name__ == "__main__": + # creat the required os env variables + os.environ['CLOUDHARNESS_EVENTS_CLIENT_ID'] = env.get_cloudharness_events_client_id() + os.environ['CLOUDHARNESS_EVENTS_SERVICE'] = env.get_cloudharness_events_service() + + # instantiate the client + client = EventClient('test-sync-op-results-qcwbc') + + # create a topic from env variables + # print(client.create_topic()) + # publish to the prev created topic + # print(client.produce({"message": "In God we trust, all others bring data..."})) + # read from the topic + print(client.consume_all('my-group')) + # delete the topic + # print(client.delete_topic())