Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions src/ash/cli/commands/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,21 @@ def query_logs(
return newest_first_entries


def _entry_matches_search(entry: dict[str, Any], search_pattern: str) -> bool:
"""Return True when a log entry matches a free-text search."""
needle = search_pattern.lower()

message = str(entry.get("message", ""))
if needle in message.lower():
return True

try:
structured = json.dumps(entry, sort_keys=True, default=str)
except TypeError:
structured = str(entry)
return needle in structured.lower()


def _read_log_file(
log_file: Path,
since: datetime | None = None,
Expand Down Expand Up @@ -301,10 +316,8 @@ def _read_log_file(
continue

# Filter by search pattern
if search_pattern:
message = entry.get("message", "")
if search_pattern.lower() not in message.lower():
continue
if search_pattern and not _entry_matches_search(entry, search_pattern):
continue

entries.append(entry)
except OSError:
Expand Down Expand Up @@ -370,10 +383,10 @@ def _follow_logs(
if component and entry.get("component") != component:
continue

if search_pattern:
message = entry.get("message", "")
if search_pattern.lower() not in message.lower():
continue
if search_pattern and not _entry_matches_search(
entry, search_pattern
):
continue

# Display entry
last_date = _display_entries(
Expand Down
24 changes: 24 additions & 0 deletions src/ash/providers/telegram/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ash.agents.types import ChildActivated
from ash.config.models import ConversationConfig
from ash.core import Agent
from ash.core.signals import is_no_reply
from ash.providers.base import IncomingMessage, OutgoingMessage
from ash.providers.telegram.handlers.checkpoint_handler import CheckpointHandler
from ash.providers.telegram.handlers.passive_handler import PassiveHandler
Expand Down Expand Up @@ -934,6 +935,29 @@ async def _run_orchestration_loop(
self._stack_manager.clear(session_key)
self._persist_stack(session_key, sm)
return response_external_id
parent = stack.top
if (
completed.agent_type == "skill"
and parent is not None
and parent.agent_type == "main"
and is_no_reply(result.text)
):
logger.info(
"child_no_reply_suppressed",
extra={
"child_agent": completed.agent_name,
"remaining_depth": stack.depth,
},
)
main_frame = stack.pop()
await self._agent.run_message_postprocess_hooks(
user_message="",
session=main_frame.session,
effective_user_id=main_frame.context.user_id or "",
)
self._stack_manager.clear(session_key)
self._persist_stack(session_key, sm)
return response_external_id
self._persist_stack(session_key, sm)
# Inject result into parent's pending tool_use
assert completed.parent_tool_use_id is not None
Expand Down
31 changes: 31 additions & 0 deletions tests/test_logs_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,34 @@ def test_query_logs_returns_latest_entries_with_newest_last(tmp_path) -> None:
results = query_logs(logs_dir, limit=2)

assert [entry["message"] for entry in results] == ["second", "third"]


def test_query_logs_searches_structured_fields(tmp_path) -> None:
logs_dir = tmp_path / "logs"
logs_dir.mkdir()
log_file = logs_dir / "2026-03-17.jsonl"
entries = [
{
"ts": "2026-03-17T01:00:02Z",
"level": "INFO",
"component": "scheduling",
"message": "scheduled_task_triggered",
"schedule.entry_id": "df0f9dfd",
},
{
"ts": "2026-03-17T01:00:09Z",
"level": "INFO",
"component": "tools",
"message": "skill_invoked",
"skill": "sfday-telegram-alert",
},
]
log_file.write_text("".join(json.dumps(entry) + "\n" for entry in entries))

by_skill = query_logs(logs_dir, search_pattern="sfday-telegram-alert")
by_schedule_id = query_logs(logs_dir, search_pattern="df0f9dfd")

assert len(by_skill) == 1
assert by_skill[0]["message"] == "skill_invoked"
assert len(by_schedule_id) == 1
assert by_schedule_id[0]["message"] == "scheduled_task_triggered"
91 changes: 91 additions & 0 deletions tests/test_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
from aiogram.enums import ParseMode
from aiogram.exceptions import TelegramBadRequest

from ash.agents.types import AgentContext, StackFrame, TurnAction, TurnResult
from ash.providers.base import IncomingMessage, OutgoingMessage
from ash.providers.telegram.formatting import rendered_text_length
from ash.providers.telegram.handlers import TelegramMessageHandler
from ash.providers.telegram.provider import TelegramProvider
from ash.sessions import SessionManager
from ash.sessions.types import generate_id


class TestTelegramProvider:
Expand Down Expand Up @@ -240,6 +243,94 @@ async def mock_stream():

mock_provider.send_typing.assert_called_once_with("456")

async def test_run_orchestration_loop_suppresses_skill_no_reply(
self, mock_provider, mock_agent, tmp_path
):
from ash.core.session import SessionState

handler = TelegramMessageHandler(
provider=mock_provider,
agent=mock_agent,
streaming=False,
agent_executor=MagicMock(),
)
mock_agent.run_message_postprocess_hooks = AsyncMock()

message = IncomingMessage(
id="1",
chat_id="456",
user_id="789",
text="run the sfday skill",
username="testuser",
display_name="Test User",
)
session_key = "telegram_456_789"
session_manager = SessionManager(
provider="telegram",
chat_id="456",
user_id="789",
sessions_path=tmp_path,
)
handler._session_handler._session_managers[session_manager.session_key] = (
session_manager
)

main_frame = StackFrame(
frame_id=generate_id(),
agent_name="main",
agent_type="main",
session=SessionState(
session_id="telegram_456_789_1",
provider="telegram",
chat_id="456",
user_id="789",
),
system_prompt="main prompt",
context=AgentContext(
session_id=session_manager.session_key,
user_id="789",
chat_id="456",
provider="telegram",
),
)
child_frame = StackFrame(
frame_id=generate_id(),
agent_name="skill:sfday-telegram-alert",
agent_type="skill",
session=SessionState(
session_id="agent-skill:sfday-telegram-alert-telegram_456_789",
provider="telegram",
chat_id="456",
user_id="789",
),
system_prompt="skill prompt",
context=AgentContext(
session_id=session_manager.session_key,
user_id="789",
chat_id="456",
provider="telegram",
),
is_skill_agent=True,
parent_tool_use_id="tool-1",
)
stack = handler._stack_manager.get_or_create(session_key)
stack.push(main_frame)
stack.push(child_frame)

handler._agent_executor.execute_turn = AsyncMock(
return_value=TurnResult(action=TurnAction.COMPLETE, text="[NO_REPLY]")
)

response_external_id = await handler._run_orchestration_loop(
message,
session_key,
)

assert response_external_id is None
assert handler._stack_manager.has_active(session_key) is False
mock_provider.send.assert_not_called()
mock_agent.run_message_postprocess_hooks.assert_awaited_once()

async def test_handle_message_streaming(
self, handler, mock_provider, mock_agent, incoming_message
):
Expand Down