From 74a681b539c3fae106b6e3591cd370c30bacc893 Mon Sep 17 00:00:00 2001 From: Matvii Sakhnenko Date: Tue, 7 Apr 2026 13:06:24 +0200 Subject: [PATCH] fix: run scheduled jobs in background tasks to prevent bus blocking AgentHandler.handle_scheduled now dispatches work via asyncio.create_task and returns immediately, preventing long Claude executions from blocking the event bus and causing missed heartbeats. A semaphore (max 2) caps concurrent Claude executions. handle_webhook remains unchanged. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/events/handlers.py | 42 ++++-- .../test_events/test_concurrent_scheduled.py | 124 ++++++++++++++++++ 2 files changed, 156 insertions(+), 10 deletions(-) create mode 100644 tests/unit/test_events/test_concurrent_scheduled.py diff --git a/src/events/handlers.py b/src/events/handlers.py index 74ceaaad..d2dcc334 100644 --- a/src/events/handlers.py +++ b/src/events/handlers.py @@ -4,8 +4,9 @@ NotificationHandler: subscribes to AgentResponseEvent and delivers to Telegram. """ +import asyncio from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, List, Set import structlog @@ -35,6 +36,8 @@ def __init__( self.claude = claude_integration self.default_working_directory = default_working_directory self.default_user_id = default_user_id + self._scheduled_semaphore = asyncio.Semaphore(2) + self._background_tasks: Set[asyncio.Task[Any]] = set() def register(self) -> None: """Subscribe to events that need agent processing.""" @@ -92,15 +95,27 @@ async def handle_scheduled(self, event: Event) -> None: job_name=event.job_name, ) - prompt = event.prompt - if event.skill_name: - prompt = ( - f"/{event.skill_name}\n\n{prompt}" if prompt else f"/{event.skill_name}" - ) + task = asyncio.create_task( + self._run_scheduled(event), + name=f"scheduled:{event.job_id or event.job_name or event.id}", + ) + self._background_tasks.add(task) + task.add_done_callback(self._task_done) + await asyncio.sleep(0) + + async def _run_scheduled(self, event: ScheduledEvent) -> None: + """Run scheduled Claude work in the background with concurrency limits.""" + async with self._scheduled_semaphore: + prompt = event.prompt + if event.skill_name: + prompt = ( + f"/{event.skill_name}\n\n{prompt}" + if prompt + else f"/{event.skill_name}" + ) - working_dir = event.working_directory or self.default_working_directory + working_dir = event.working_directory or self.default_working_directory - try: response = await self.claude.run_command( prompt=prompt, working_directory=working_dir, @@ -126,11 +141,18 @@ async def handle_scheduled(self, event: Event) -> None: originating_event_id=event.id, ) ) + + def _task_done(self, task: asyncio.Task[Any]) -> None: + """Clean up finished scheduled tasks and log failures.""" + self._background_tasks.discard(task) + try: + task.result() + except asyncio.CancelledError: + return except Exception: logger.exception( "Agent execution failed for scheduled event", - job_id=event.job_id, - event_id=event.id, + task_name=task.get_name(), ) def _build_webhook_prompt(self, event: WebhookEvent) -> str: diff --git a/tests/unit/test_events/test_concurrent_scheduled.py b/tests/unit/test_events/test_concurrent_scheduled.py new file mode 100644 index 00000000..b1d25506 --- /dev/null +++ b/tests/unit/test_events/test_concurrent_scheduled.py @@ -0,0 +1,124 @@ +"""Tests for concurrent scheduled event handling.""" + +import asyncio +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from src.events.bus import EventBus +from src.events.handlers import AgentHandler +from src.events.types import ScheduledEvent, WebhookEvent + + +@pytest.fixture +def event_bus() -> EventBus: + return EventBus() + + +@pytest.fixture +def mock_claude() -> AsyncMock: + mock = AsyncMock() + mock.run_command = AsyncMock() + return mock + + +@pytest.fixture +def agent_handler(event_bus: EventBus, mock_claude: AsyncMock) -> AgentHandler: + return AgentHandler( + event_bus=event_bus, + claude_integration=mock_claude, + default_working_directory=Path("/tmp/test"), + default_user_id=42, + ) + + +class TestConcurrentScheduledHandling: + """Tests for background dispatch of scheduled jobs.""" + + async def test_handle_scheduled_returns_immediately( + self, agent_handler: AgentHandler, mock_claude: AsyncMock + ) -> None: + """Scheduled jobs should dispatch to the background and return fast.""" + + async def slow_run_command(**_: object) -> MagicMock: + await asyncio.sleep(10) + response = MagicMock() + response.content = "done" + return response + + mock_claude.run_command.side_effect = slow_run_command + event = ScheduledEvent( + job_name="standup", + prompt="Generate daily standup", + target_chat_ids=[100], + ) + + await asyncio.wait_for(agent_handler.handle_scheduled(event), timeout=1.0) + + for task in list(getattr(agent_handler, "_background_tasks", set())): + task.cancel() + if getattr(agent_handler, "_background_tasks", None): + await asyncio.gather(*agent_handler._background_tasks, return_exceptions=True) + + def test_scheduled_semaphore_limits_concurrency( + self, agent_handler: AgentHandler + ) -> None: + """Scheduled jobs are capped to two concurrent Claude executions.""" + assert agent_handler._scheduled_semaphore._value == 2 + + async def test_scheduled_task_errors_are_logged_not_raised( + self, agent_handler: AgentHandler, mock_claude: AsyncMock + ) -> None: + """Background scheduled task failures should be logged, not propagated.""" + + async def boom(**_: object) -> MagicMock: + raise RuntimeError("SDK error") + + mock_claude.run_command.side_effect = boom + event = ScheduledEvent( + job_name="standup", + prompt="Generate daily standup", + target_chat_ids=[100], + ) + + with patch("src.events.handlers.logger.exception") as mock_log: + await agent_handler.handle_scheduled(event) + tasks = list(agent_handler._background_tasks) + assert tasks + await asyncio.gather(*tasks, return_exceptions=True) + await asyncio.sleep(0) + + mock_log.assert_called() + + async def test_webhook_handler_still_blocks( + self, agent_handler: AgentHandler, mock_claude: AsyncMock + ) -> None: + """Webhook handling should still await Claude directly.""" + release = asyncio.Event() + started = asyncio.Event() + + async def blocked_run_command(**_: object) -> MagicMock: + started.set() + await release.wait() + response = MagicMock() + response.content = "done" + return response + + mock_claude.run_command.side_effect = blocked_run_command + event = WebhookEvent( + provider="github", + event_type_name="push", + payload={"ref": "refs/heads/main"}, + delivery_id="del-1", + ) + + task = asyncio.create_task(agent_handler.handle_webhook(event)) + await started.wait() + await asyncio.sleep(0) + + assert not task.done() + + release.set() + await asyncio.wait_for(task, timeout=1.0) + mock_claude.run_command.assert_awaited_once()