diff --git a/src/ash/cli/commands/logs.py b/src/ash/cli/commands/logs.py index aa2ffe63..137c58d4 100644 --- a/src/ash/cli/commands/logs.py +++ b/src/ash/cli/commands/logs.py @@ -255,6 +255,21 @@ def query_logs( return newest_first_entries +def _entry_matches_search(entry: dict[str, Any], search_pattern: str) -> bool: + """Return True when a log entry matches a free-text search.""" + needle = search_pattern.lower() + + message = str(entry.get("message", "")) + if needle in message.lower(): + return True + + try: + structured = json.dumps(entry, sort_keys=True, default=str) + except TypeError: + structured = str(entry) + return needle in structured.lower() + + def _read_log_file( log_file: Path, since: datetime | None = None, @@ -301,10 +316,8 @@ def _read_log_file( continue # Filter by search pattern - if search_pattern: - message = entry.get("message", "") - if search_pattern.lower() not in message.lower(): - continue + if search_pattern and not _entry_matches_search(entry, search_pattern): + continue entries.append(entry) except OSError: @@ -370,10 +383,10 @@ def _follow_logs( if component and entry.get("component") != component: continue - if search_pattern: - message = entry.get("message", "") - if search_pattern.lower() not in message.lower(): - continue + if search_pattern and not _entry_matches_search( + entry, search_pattern + ): + continue # Display entry last_date = _display_entries( diff --git a/src/ash/providers/telegram/handlers/message_handler.py b/src/ash/providers/telegram/handlers/message_handler.py index 55e6758c..4650cc1a 100644 --- a/src/ash/providers/telegram/handlers/message_handler.py +++ b/src/ash/providers/telegram/handlers/message_handler.py @@ -12,6 +12,7 @@ from ash.agents.types import ChildActivated from ash.config.models import ConversationConfig from ash.core import Agent +from ash.core.signals import is_no_reply from ash.providers.base import IncomingMessage, OutgoingMessage from ash.providers.telegram.handlers.checkpoint_handler import CheckpointHandler from ash.providers.telegram.handlers.passive_handler import PassiveHandler @@ -934,6 +935,29 @@ async def _run_orchestration_loop( self._stack_manager.clear(session_key) self._persist_stack(session_key, sm) return response_external_id + parent = stack.top + if ( + completed.agent_type == "skill" + and parent is not None + and parent.agent_type == "main" + and is_no_reply(result.text) + ): + logger.info( + "child_no_reply_suppressed", + extra={ + "child_agent": completed.agent_name, + "remaining_depth": stack.depth, + }, + ) + main_frame = stack.pop() + await self._agent.run_message_postprocess_hooks( + user_message="", + session=main_frame.session, + effective_user_id=main_frame.context.user_id or "", + ) + self._stack_manager.clear(session_key) + self._persist_stack(session_key, sm) + return response_external_id self._persist_stack(session_key, sm) # Inject result into parent's pending tool_use assert completed.parent_tool_use_id is not None diff --git a/tests/test_logs_command.py b/tests/test_logs_command.py index 60c10748..29f54ec2 100644 --- a/tests/test_logs_command.py +++ b/tests/test_logs_command.py @@ -146,3 +146,34 @@ def test_query_logs_returns_latest_entries_with_newest_last(tmp_path) -> None: results = query_logs(logs_dir, limit=2) assert [entry["message"] for entry in results] == ["second", "third"] + + +def test_query_logs_searches_structured_fields(tmp_path) -> None: + logs_dir = tmp_path / "logs" + logs_dir.mkdir() + log_file = logs_dir / "2026-03-17.jsonl" + entries = [ + { + "ts": "2026-03-17T01:00:02Z", + "level": "INFO", + "component": "scheduling", + "message": "scheduled_task_triggered", + "schedule.entry_id": "df0f9dfd", + }, + { + "ts": "2026-03-17T01:00:09Z", + "level": "INFO", + "component": "tools", + "message": "skill_invoked", + "skill": "sfday-telegram-alert", + }, + ] + log_file.write_text("".join(json.dumps(entry) + "\n" for entry in entries)) + + by_skill = query_logs(logs_dir, search_pattern="sfday-telegram-alert") + by_schedule_id = query_logs(logs_dir, search_pattern="df0f9dfd") + + assert len(by_skill) == 1 + assert by_skill[0]["message"] == "skill_invoked" + assert len(by_schedule_id) == 1 + assert by_schedule_id[0]["message"] == "scheduled_task_triggered" diff --git a/tests/test_providers.py b/tests/test_providers.py index 08e29d71..1d19b827 100644 --- a/tests/test_providers.py +++ b/tests/test_providers.py @@ -8,10 +8,13 @@ from aiogram.enums import ParseMode from aiogram.exceptions import TelegramBadRequest +from ash.agents.types import AgentContext, StackFrame, TurnAction, TurnResult from ash.providers.base import IncomingMessage, OutgoingMessage from ash.providers.telegram.formatting import rendered_text_length from ash.providers.telegram.handlers import TelegramMessageHandler from ash.providers.telegram.provider import TelegramProvider +from ash.sessions import SessionManager +from ash.sessions.types import generate_id class TestTelegramProvider: @@ -240,6 +243,94 @@ async def mock_stream(): mock_provider.send_typing.assert_called_once_with("456") + async def test_run_orchestration_loop_suppresses_skill_no_reply( + self, mock_provider, mock_agent, tmp_path + ): + from ash.core.session import SessionState + + handler = TelegramMessageHandler( + provider=mock_provider, + agent=mock_agent, + streaming=False, + agent_executor=MagicMock(), + ) + mock_agent.run_message_postprocess_hooks = AsyncMock() + + message = IncomingMessage( + id="1", + chat_id="456", + user_id="789", + text="run the sfday skill", + username="testuser", + display_name="Test User", + ) + session_key = "telegram_456_789" + session_manager = SessionManager( + provider="telegram", + chat_id="456", + user_id="789", + sessions_path=tmp_path, + ) + handler._session_handler._session_managers[session_manager.session_key] = ( + session_manager + ) + + main_frame = StackFrame( + frame_id=generate_id(), + agent_name="main", + agent_type="main", + session=SessionState( + session_id="telegram_456_789_1", + provider="telegram", + chat_id="456", + user_id="789", + ), + system_prompt="main prompt", + context=AgentContext( + session_id=session_manager.session_key, + user_id="789", + chat_id="456", + provider="telegram", + ), + ) + child_frame = StackFrame( + frame_id=generate_id(), + agent_name="skill:sfday-telegram-alert", + agent_type="skill", + session=SessionState( + session_id="agent-skill:sfday-telegram-alert-telegram_456_789", + provider="telegram", + chat_id="456", + user_id="789", + ), + system_prompt="skill prompt", + context=AgentContext( + session_id=session_manager.session_key, + user_id="789", + chat_id="456", + provider="telegram", + ), + is_skill_agent=True, + parent_tool_use_id="tool-1", + ) + stack = handler._stack_manager.get_or_create(session_key) + stack.push(main_frame) + stack.push(child_frame) + + handler._agent_executor.execute_turn = AsyncMock( + return_value=TurnResult(action=TurnAction.COMPLETE, text="[NO_REPLY]") + ) + + response_external_id = await handler._run_orchestration_loop( + message, + session_key, + ) + + assert response_external_id is None + assert handler._stack_manager.has_active(session_key) is False + mock_provider.send.assert_not_called() + mock_agent.run_message_postprocess_hooks.assert_awaited_once() + async def test_handle_message_streaming( self, handler, mock_provider, mock_agent, incoming_message ):