Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion mcp/telegram-readonly/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The daemon auto-starts on first use and shuts down after 30 min idle.

```bash
tg dialogs [--query Q] [--limit N] # list chats
tg read <chat> [--limit N] # read messages
tg read <chat> [--limit N] # read messages, batched automatically
tg search <chat> <query> [--limit N] # search in a chat
tg info <chat> # chat metadata
tg daemon status|start|stop|log # manage daemon
Expand All @@ -34,6 +34,13 @@ Output is JSON. Pipe through `jq` for filtering:
tg read max_akhmedov -n 10 | jq '.[] | select(.sender_name != "K K") | .text'
```

Large reads are paged through the daemon in batches of up to 100 messages,
then printed as one JSON array:

```bash
tg read max_akhmedov -n 1000 > messages.json
```

## Why CLI + skill, not MCP

The MCP server is kept for backward compatibility but CLI is the primary
Expand Down
15 changes: 12 additions & 3 deletions mcp/telegram-readonly/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def _require_env() -> tuple[int, str, str]:
_PID_PATH = _STATE_DIR / "daemon.pid"
_LOG_PATH = _STATE_DIR / "daemon.log"
_IDLE_TIMEOUT = 1800
_MAX_MESSAGES_PER_CALL = 100


# ── Helpers (shared by daemon) ───────────────────────────────────────────
Expand Down Expand Up @@ -183,7 +184,12 @@ async def handle_method(method: str, params: dict) -> Any:
if method == "get_recent_messages":
entity = await _resolve_chat(client, params["chat"])
msgs = []
async for msg in client.iter_messages(entity, limit=max(1, min(params.get("limit", 30), 200))):
kwargs = {
"limit": max(1, min(params.get("limit", 30), _MAX_MESSAGES_PER_CALL)),
}
if params.get("offset_id") is not None:
kwargs["offset_id"] = int(params["offset_id"])
async for msg in client.iter_messages(entity, **kwargs):
msgs.append(_message_to_dict(msg))
return list(reversed(msgs))
if method == "search_messages":
Expand Down Expand Up @@ -353,9 +359,12 @@ async def list_dialogs(limit: int = 50, query: str | None = None) -> Any:


@mcp.tool()
async def get_recent_messages(chat: str, limit: int = 30) -> Any:
async def get_recent_messages(chat: str, limit: int = 30, offset_id: int | None = None) -> Any:
"""Read recent messages from a chat by username, id, exact title, or title substring."""
return await _proxy_call("get_recent_messages", {"chat": chat, "limit": limit})
params: dict = {"chat": chat, "limit": limit}
if offset_id is not None:
params["offset_id"] = offset_id
return await _proxy_call("get_recent_messages", params)


@mcp.tool()
Expand Down
72 changes: 72 additions & 0 deletions mcp/telegram-readonly/test_tg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from __future__ import annotations

import importlib.util
import unittest
from importlib.machinery import SourceFileLoader
from pathlib import Path
from typing import Any
from unittest.mock import patch


ROOT = Path(__file__).resolve().parent


def _load_tg() -> Any:
spec = importlib.util.spec_from_loader("tg_cli", SourceFileLoader("tg_cli", str(ROOT / "tg")))
assert spec is not None
assert spec.loader is not None
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module


class ReadMessagesTest(unittest.IsolatedAsyncioTestCase):
async def test_read_messages_pages_oldest_to_newest(self) -> None:
tg = _load_tg()
calls: list[dict] = []
pages = [
[
{"id": 4, "text": "four"},
{"id": 5, "text": "five"},
],
[
{"id": 2, "text": "two"},
{"id": 3, "text": "three"},
],
[
{"id": 1, "text": "one"},
],
]

async def fake_call(method: str, params: dict) -> list[dict]:
self.assertEqual(method, "get_recent_messages")
calls.append(params)
return pages[len(calls) - 1]

with patch.object(tg, "_call", fake_call):
messages = await tg._read_messages("chat", limit=5, batch_size=2)

self.assertEqual([m["id"] for m in messages], [1, 2, 3, 4, 5])
self.assertEqual(calls, [
{"chat": "chat", "limit": 2},
{"chat": "chat", "limit": 2, "offset_id": 4},
{"chat": "chat", "limit": 1, "offset_id": 2},
])

async def test_read_messages_rejects_non_paging_daemon(self) -> None:
tg = _load_tg()
pages = [
[{"id": 4}, {"id": 5}],
[{"id": 4}, {"id": 5}],
]

async def fake_call(method: str, params: dict) -> list[dict]:
return pages.pop(0)

with patch.object(tg, "_call", fake_call):
with self.assertRaisesRegex(RuntimeError, "paged reads"):
await tg._read_messages("chat", limit=4, batch_size=2)


if __name__ == "__main__":
unittest.main()
42 changes: 41 additions & 1 deletion mcp/telegram-readonly/tg
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ _STATE_DIR = Path(os.getenv(
_SOCK_PATH = _STATE_DIR / "daemon.sock"
_PID_PATH = _STATE_DIR / "daemon.pid"
_LOG_PATH = _STATE_DIR / "daemon.log"
_MAX_READ_BATCH = 100


async def _ensure_daemon() -> None:
Expand Down Expand Up @@ -87,6 +88,39 @@ def _print(data: any) -> None:
print(json.dumps(data, ensure_ascii=False, indent=2))


async def _read_messages(chat: str, limit: int, batch_size: int, offset_id: int | None = None) -> list[dict]:
if limit < 1:
raise ValueError("--limit must be at least 1")
if batch_size < 1:
raise ValueError("--batch-size must be at least 1")
batch_size = min(batch_size, _MAX_READ_BATCH)

pages: list[list[dict]] = []
remaining = limit
next_offset_id = offset_id

while remaining > 0:
page_limit = min(batch_size, remaining)
params: dict = {"chat": chat, "limit": page_limit}
if next_offset_id is not None:
params["offset_id"] = next_offset_id

page = await _call("get_recent_messages", params)
if not page:
break
if next_offset_id is not None and page[0]["id"] >= next_offset_id:
raise RuntimeError("daemon does not support paged reads; run `tg daemon stop` and retry")

pages.append(page)
remaining -= len(page)
next_offset_id = page[0]["id"]

if len(page) < page_limit:
break

return [msg for page in reversed(pages) for msg in page]


def _daemon_status() -> None:
if not _PID_PATH.exists():
print("stopped (no pid file)")
Expand Down Expand Up @@ -124,6 +158,8 @@ def main() -> None:
r = sub.add_parser("read", help="Read messages from a chat")
r.add_argument("chat")
r.add_argument("--limit", "-n", type=int, default=20)
r.add_argument("--batch-size", type=int, default=_MAX_READ_BATCH, help=f"messages per daemon request, max {_MAX_READ_BATCH}")
r.add_argument("--offset-id", type=int, default=None, help="read messages older than this Telegram message id")

s = sub.add_parser("search", help="Search messages in a chat")
s.add_argument("chat")
Expand Down Expand Up @@ -163,7 +199,11 @@ def main() -> None:
params["query"] = args.query
_print(asyncio.run(_call("list_dialogs", params)))
elif args.cmd == "read":
_print(asyncio.run(_call("get_recent_messages", {"chat": args.chat, "limit": args.limit})))
try:
_print(asyncio.run(_read_messages(args.chat, args.limit, args.batch_size, args.offset_id)))
except (RuntimeError, ValueError) as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
elif args.cmd == "search":
_print(asyncio.run(_call("search_messages", {"chat": args.chat, "query": args.query, "limit": args.limit})))
elif args.cmd == "info":
Expand Down
8 changes: 7 additions & 1 deletion plugins/dotagents/skills/tg/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ tg dialogs # list recent chats (default 20)
tg dialogs --query "Berlin" # filter by name substring
tg dialogs --limit 50 # more results

tg read <chat> [--limit N] # read recent messages
tg read <chat> [--limit N] # read recent messages, batched automatically
tg search <chat> <query> [-n N] # search within a chat
tg info <chat> # chat metadata

Expand All @@ -40,6 +40,12 @@ All commands output JSON. Pipe through `jq` for filtering:
tg read wq67753 -n 5 | jq '.[] | select(.sender_name == "Модник") | .text'
```

For larger history reads, `tg read` fetches daemon-safe batches and joins them into one JSON array:

```bash
tg read wq67753 -n 1000 > messages.json
```

## Architecture

A singleton daemon holds the Telethon MTProto session. The CLI and MCP servers proxy through a Unix domain socket. Multiple Claude Code sessions can read Telegram simultaneously without auth conflicts.
Expand Down
8 changes: 7 additions & 1 deletion skills/tg/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ tg dialogs # list recent chats (default 20)
tg dialogs --query "Berlin" # filter by name substring
tg dialogs --limit 50 # more results

tg read <chat> [--limit N] # read recent messages
tg read <chat> [--limit N] # read recent messages, batched automatically
tg search <chat> <query> [-n N] # search within a chat
tg info <chat> # chat metadata

Expand All @@ -40,6 +40,12 @@ All commands output JSON. Pipe through `jq` for filtering:
tg read wq67753 -n 5 | jq '.[] | select(.sender_name == "Модник") | .text'
```

For larger history reads, `tg read` fetches daemon-safe batches and joins them into one JSON array:

```bash
tg read wq67753 -n 1000 > messages.json
```

## Architecture

A singleton daemon holds the Telethon MTProto session. The CLI and MCP servers proxy through a Unix domain socket. Multiple Claude Code sessions can read Telegram simultaneously without auth conflicts.
Expand Down
Loading