-
Notifications
You must be signed in to change notification settings - Fork 8
Open
Description
Did you check if your messagebus is async? I am new to asyncio, but it seems will be sync. This is what I was thinking:
class AsyncIAMMessageBus:
def __init__(
self,
uow: AsyncIAMUnitOfWork,
event_handlers: dict[type[Event], list[Coroutine]],
command_handlers: dict[type[Command], Coroutine],
):
self.uow = uow
self.event_handlers = event_handlers
self.command_handlers = command_handlers
async def handle(self, message: Event | Command):
if isinstance(message, Command):
await self._handle_command(message)
elif isinstance(message, Event):
await self._handle_event(message)
else:
raise Exception(f"{message} is not a Command or Event")
async def _completed(self, handler, message: Event | Command):
try:
await handler(message)
except Exception:
if isinstance(message, Event):
logger.exception(f"Exception handling event {message}")
pass
else:
logger.exception(f"Exception handling command {message}")
raise
else:
new_events = self.uow.collect_new_events()
if new_events:
async with anyio.create_task_group() as tg:
for event in new_events:
tg.start_soon(self.handle, event)
async def _handle_event(self, event: Event):
handlers = [handler for handler in self.event_handlers[type(event)]]
async with anyio.create_task_group() as tg:
for handler in handlers:
tg.start_soon(self._completed, handler, event)
async def _handle_command(self, command: Command):
logger.debug("handling command %s", command)
handler = self.command_handlers[type(command)]
async with anyio.create_task_group() as tg:
tg.start_soon(self._completed, handler, command)
I am using anyio, but it would be the same with asyncio. Using TaskGroup. This way tasks would run concurrently and you would pickup new events right away. I also believe I should add timeout for the tasks...
Metadata
Metadata
Assignees
Labels
No labels