diff --git a/mcp/telegram-readonly/README.md b/mcp/telegram-readonly/README.md index ca12c13..a8b2b53 100644 --- a/mcp/telegram-readonly/README.md +++ b/mcp/telegram-readonly/README.md @@ -20,7 +20,7 @@ The daemon auto-starts on first use and shuts down after 30 min idle. ```bash tg dialogs [--query Q] [--limit N] # list chats -tg read [--limit N] # read messages +tg read [--limit N] # read messages, batched automatically tg search [--limit N] # search in a chat tg info # chat metadata tg daemon status|start|stop|log # manage daemon @@ -34,6 +34,13 @@ Output is JSON. Pipe through `jq` for filtering: tg read max_akhmedov -n 10 | jq '.[] | select(.sender_name != "K K") | .text' ``` +Large reads are paged through the daemon in batches of up to 100 messages, +then printed as one JSON array: + +```bash +tg read max_akhmedov -n 1000 > messages.json +``` + ## Why CLI + skill, not MCP The MCP server is kept for backward compatibility but CLI is the primary diff --git a/mcp/telegram-readonly/server.py b/mcp/telegram-readonly/server.py index c16ef7f..2bf69ee 100755 --- a/mcp/telegram-readonly/server.py +++ b/mcp/telegram-readonly/server.py @@ -59,6 +59,7 @@ def _require_env() -> tuple[int, str, str]: _PID_PATH = _STATE_DIR / "daemon.pid" _LOG_PATH = _STATE_DIR / "daemon.log" _IDLE_TIMEOUT = 1800 +_MAX_MESSAGES_PER_CALL = 100 # ── Helpers (shared by daemon) ─────────────────────────────────────────── @@ -183,7 +184,12 @@ async def handle_method(method: str, params: dict) -> Any: if method == "get_recent_messages": entity = await _resolve_chat(client, params["chat"]) msgs = [] - async for msg in client.iter_messages(entity, limit=max(1, min(params.get("limit", 30), 200))): + kwargs = { + "limit": max(1, min(params.get("limit", 30), _MAX_MESSAGES_PER_CALL)), + } + if params.get("offset_id") is not None: + kwargs["offset_id"] = int(params["offset_id"]) + async for msg in client.iter_messages(entity, **kwargs): msgs.append(_message_to_dict(msg)) return list(reversed(msgs)) if method == "search_messages": @@ -353,9 +359,12 @@ async def list_dialogs(limit: int = 50, query: str | None = None) -> Any: @mcp.tool() -async def get_recent_messages(chat: str, limit: int = 30) -> Any: +async def get_recent_messages(chat: str, limit: int = 30, offset_id: int | None = None) -> Any: """Read recent messages from a chat by username, id, exact title, or title substring.""" - return await _proxy_call("get_recent_messages", {"chat": chat, "limit": limit}) + params: dict = {"chat": chat, "limit": limit} + if offset_id is not None: + params["offset_id"] = offset_id + return await _proxy_call("get_recent_messages", params) @mcp.tool() diff --git a/mcp/telegram-readonly/test_tg.py b/mcp/telegram-readonly/test_tg.py new file mode 100644 index 0000000..4bc718d --- /dev/null +++ b/mcp/telegram-readonly/test_tg.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import importlib.util +import unittest +from importlib.machinery import SourceFileLoader +from pathlib import Path +from typing import Any +from unittest.mock import patch + + +ROOT = Path(__file__).resolve().parent + + +def _load_tg() -> Any: + spec = importlib.util.spec_from_loader("tg_cli", SourceFileLoader("tg_cli", str(ROOT / "tg"))) + assert spec is not None + assert spec.loader is not None + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +class ReadMessagesTest(unittest.IsolatedAsyncioTestCase): + async def test_read_messages_pages_oldest_to_newest(self) -> None: + tg = _load_tg() + calls: list[dict] = [] + pages = [ + [ + {"id": 4, "text": "four"}, + {"id": 5, "text": "five"}, + ], + [ + {"id": 2, "text": "two"}, + {"id": 3, "text": "three"}, + ], + [ + {"id": 1, "text": "one"}, + ], + ] + + async def fake_call(method: str, params: dict) -> list[dict]: + self.assertEqual(method, "get_recent_messages") + calls.append(params) + return pages[len(calls) - 1] + + with patch.object(tg, "_call", fake_call): + messages = await tg._read_messages("chat", limit=5, batch_size=2) + + self.assertEqual([m["id"] for m in messages], [1, 2, 3, 4, 5]) + self.assertEqual(calls, [ + {"chat": "chat", "limit": 2}, + {"chat": "chat", "limit": 2, "offset_id": 4}, + {"chat": "chat", "limit": 1, "offset_id": 2}, + ]) + + async def test_read_messages_rejects_non_paging_daemon(self) -> None: + tg = _load_tg() + pages = [ + [{"id": 4}, {"id": 5}], + [{"id": 4}, {"id": 5}], + ] + + async def fake_call(method: str, params: dict) -> list[dict]: + return pages.pop(0) + + with patch.object(tg, "_call", fake_call): + with self.assertRaisesRegex(RuntimeError, "paged reads"): + await tg._read_messages("chat", limit=4, batch_size=2) + + +if __name__ == "__main__": + unittest.main() diff --git a/mcp/telegram-readonly/tg b/mcp/telegram-readonly/tg index 4cd27cc..f86880a 100755 --- a/mcp/telegram-readonly/tg +++ b/mcp/telegram-readonly/tg @@ -27,6 +27,7 @@ _STATE_DIR = Path(os.getenv( _SOCK_PATH = _STATE_DIR / "daemon.sock" _PID_PATH = _STATE_DIR / "daemon.pid" _LOG_PATH = _STATE_DIR / "daemon.log" +_MAX_READ_BATCH = 100 async def _ensure_daemon() -> None: @@ -87,6 +88,39 @@ def _print(data: any) -> None: print(json.dumps(data, ensure_ascii=False, indent=2)) +async def _read_messages(chat: str, limit: int, batch_size: int, offset_id: int | None = None) -> list[dict]: + if limit < 1: + raise ValueError("--limit must be at least 1") + if batch_size < 1: + raise ValueError("--batch-size must be at least 1") + batch_size = min(batch_size, _MAX_READ_BATCH) + + pages: list[list[dict]] = [] + remaining = limit + next_offset_id = offset_id + + while remaining > 0: + page_limit = min(batch_size, remaining) + params: dict = {"chat": chat, "limit": page_limit} + if next_offset_id is not None: + params["offset_id"] = next_offset_id + + page = await _call("get_recent_messages", params) + if not page: + break + if next_offset_id is not None and page[0]["id"] >= next_offset_id: + raise RuntimeError("daemon does not support paged reads; run `tg daemon stop` and retry") + + pages.append(page) + remaining -= len(page) + next_offset_id = page[0]["id"] + + if len(page) < page_limit: + break + + return [msg for page in reversed(pages) for msg in page] + + def _daemon_status() -> None: if not _PID_PATH.exists(): print("stopped (no pid file)") @@ -124,6 +158,8 @@ def main() -> None: r = sub.add_parser("read", help="Read messages from a chat") r.add_argument("chat") r.add_argument("--limit", "-n", type=int, default=20) + r.add_argument("--batch-size", type=int, default=_MAX_READ_BATCH, help=f"messages per daemon request, max {_MAX_READ_BATCH}") + r.add_argument("--offset-id", type=int, default=None, help="read messages older than this Telegram message id") s = sub.add_parser("search", help="Search messages in a chat") s.add_argument("chat") @@ -163,7 +199,11 @@ def main() -> None: params["query"] = args.query _print(asyncio.run(_call("list_dialogs", params))) elif args.cmd == "read": - _print(asyncio.run(_call("get_recent_messages", {"chat": args.chat, "limit": args.limit}))) + try: + _print(asyncio.run(_read_messages(args.chat, args.limit, args.batch_size, args.offset_id))) + except (RuntimeError, ValueError) as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) elif args.cmd == "search": _print(asyncio.run(_call("search_messages", {"chat": args.chat, "query": args.query, "limit": args.limit}))) elif args.cmd == "info": diff --git a/plugins/dotagents/skills/tg/SKILL.md b/plugins/dotagents/skills/tg/SKILL.md index 6e8be49..d31d635 100644 --- a/plugins/dotagents/skills/tg/SKILL.md +++ b/plugins/dotagents/skills/tg/SKILL.md @@ -14,7 +14,7 @@ tg dialogs # list recent chats (default 20) tg dialogs --query "Berlin" # filter by name substring tg dialogs --limit 50 # more results -tg read [--limit N] # read recent messages +tg read [--limit N] # read recent messages, batched automatically tg search [-n N] # search within a chat tg info # chat metadata @@ -40,6 +40,12 @@ All commands output JSON. Pipe through `jq` for filtering: tg read wq67753 -n 5 | jq '.[] | select(.sender_name == "Модник") | .text' ``` +For larger history reads, `tg read` fetches daemon-safe batches and joins them into one JSON array: + +```bash +tg read wq67753 -n 1000 > messages.json +``` + ## Architecture A singleton daemon holds the Telethon MTProto session. The CLI and MCP servers proxy through a Unix domain socket. Multiple Claude Code sessions can read Telegram simultaneously without auth conflicts. diff --git a/skills/tg/SKILL.md b/skills/tg/SKILL.md index 6e8be49..d31d635 100644 --- a/skills/tg/SKILL.md +++ b/skills/tg/SKILL.md @@ -14,7 +14,7 @@ tg dialogs # list recent chats (default 20) tg dialogs --query "Berlin" # filter by name substring tg dialogs --limit 50 # more results -tg read [--limit N] # read recent messages +tg read [--limit N] # read recent messages, batched automatically tg search [-n N] # search within a chat tg info # chat metadata @@ -40,6 +40,12 @@ All commands output JSON. Pipe through `jq` for filtering: tg read wq67753 -n 5 | jq '.[] | select(.sender_name == "Модник") | .text' ``` +For larger history reads, `tg read` fetches daemon-safe batches and joins them into one JSON array: + +```bash +tg read wq67753 -n 1000 > messages.json +``` + ## Architecture A singleton daemon holds the Telethon MTProto session. The CLI and MCP servers proxy through a Unix domain socket. Multiple Claude Code sessions can read Telegram simultaneously without auth conflicts.