Skip to content

Commit 676a105

Browse files
committed
Optional periodic import of history while listening for messages
1 parent 06fb3a3 commit 676a105

File tree

5 files changed

+168
-15
lines changed

5 files changed

+168
-15
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## [3.2.0] - pending
4+
5+
* Optional periodic import of history while listening for messages
6+
37
## [3.1.2] - 2023-11-08
48

59
* Fixed crash if username, first_name or last_name of sender are not defined (thanks to @cololi)
@@ -45,4 +49,4 @@ Chat Import: Fixed not correctly resolving entities if ID is given, prefix names
4549

4650
## [1.0] - 2020-10-22
4751

48-
Initial release
52+
Initial release

README.md

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,34 @@ output_map:
8282
"message.translated": "translated_text"
8383
```
8484

85+
## Periodically import history
86+
87+
In some cases you might want to periodically import the history while listening for messages.
88+
89+
This feature is disabled by default but can be enabled by adding the following configuration to your `config.yml`:
90+
91+
```yaml
92+
periodic_import:
93+
interval: 1d
94+
range: 7d
95+
```
96+
97+
At least an interval is required. If no range is given, everything is imported.
98+
99+
You might also specify multiple time units like "1d12h" which means "1 days and 12 hours".
100+
101+
The following units are supported:
102+
103+
| Unit | Meaning | Example |
104+
|------|---------|---------|
105+
| y | Years | 1y |
106+
| mo | Months | 12mo |
107+
| w | Weeks | 30w |
108+
| d | Days | 7d |
109+
| h | Hours | 24h |
110+
| m | Minutes | 60m |
111+
| s | Seconds | 60s |
112+
85113
## Initial setup
86114

87115
When started for the first time, the application will ask you to connect with your Telegram account.
@@ -90,4 +118,4 @@ When started for the first time, the application will ask you to connect with yo
90118

91119
* `listen` - Listen for chat messages and write them to the configured outputs
92120
* `import-history` - Import the chat history (the complete history or only for specific chats or a specific time range)
93-
* `list-chats` - List available chats
121+
* `list-chats` - List available chats

config.sample.yml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,17 @@ telegram:
1818
- <id 2>
1919
- ...
2020

21+
# Periodically import history
22+
# The time properties support multiple units: y (years), mo (months), w (weeks), d (days), h (hours), m (minutes), s (seconds)
23+
# Those units can be combined, i.e. "1d12h" means "1 days and 12 hours".
24+
periodic_import:
25+
# Interval between periodic imports
26+
interval: 1d
27+
28+
# Range to import, e.g. 7d to import the last 7 days
29+
# Omit to always import everything
30+
range: 7d
31+
2132
# Configure whether messages should be translated into the specified language
2233
# Use the two-letter ISO 639-1 language code (examples: "de", "en", "es", "it")
2334
# Omit or keep empty to disable translations
@@ -154,4 +165,4 @@ outputs:
154165
chat: "get_display_name(await message.get_chat())"
155166
message: "message.text"
156167
translated_message: "translated_text"
157-
media: "media.filename if media else None"
168+
media: "media.filename if media else None"

telegram2elastic.py

Lines changed: 99 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
import yaml
1616

17-
from datetime import datetime
17+
from datetime import datetime, timedelta
1818
from telethon import TelegramClient, events
1919
from telethon.tl import types
2020
from telethon.tl.functions.messages import TranslateTextRequest
@@ -84,6 +84,66 @@ def set(self, path, value):
8484
node[path_parts[-1]] = value
8585

8686

87+
class TimeInterval:
88+
seconds: int
89+
90+
def __init__(self, seconds: int):
91+
self.seconds = seconds
92+
93+
@staticmethod
94+
def parse(string: str):
95+
matches = re.findall(r"(\d+)(y|mo|w|d|h|m|s)", re.sub(r"[\s,_-]+", "", string.lower()))
96+
if not matches:
97+
return None
98+
99+
total_seconds = 0
100+
101+
for value, unit in matches:
102+
value = int(value)
103+
104+
if unit == "y":
105+
total_seconds += value * 365 * 24 * 3600
106+
elif unit == "mo":
107+
total_seconds += value * 30 * 24 * 3600
108+
elif unit == "w":
109+
total_seconds += value * 7 * 24 * 3600
110+
elif unit == "d":
111+
total_seconds += value * 24 * 3600
112+
elif unit == "h":
113+
total_seconds += value * 3600
114+
elif unit == "m":
115+
total_seconds += value * 60
116+
elif unit == "s":
117+
total_seconds += value
118+
119+
return TimeInterval(total_seconds)
120+
121+
def format_human_readable(self):
122+
total_seconds = self.seconds
123+
parts = []
124+
125+
units = [
126+
("year", 365 * 24 * 3600),
127+
("month", 30 * 24 * 3600),
128+
("week", 7 * 24 * 3600),
129+
("day", 24 * 3600),
130+
("hour", 3600),
131+
("minute", 60),
132+
("second", 1),
133+
]
134+
135+
for name, unit_seconds in units:
136+
value, total_seconds = divmod(total_seconds, unit_seconds)
137+
if value:
138+
label = name if value == 1 else name + "s"
139+
parts.append(f"{value} {label}")
140+
141+
return ", ".join(parts) if parts else "0 seconds"
142+
143+
def timedelta(self):
144+
return timedelta(seconds=self.seconds)
145+
146+
87147
def json_default(value):
88148
if isinstance(value, bytes):
89149
return base64.b64encode(value).decode("ascii")
@@ -439,12 +499,7 @@ def __init__(self, config: dict, output_handler: OutputHandler):
439499
self.additional_chats = config.get("additional_chats", [])
440500
self.chat_types = config.get("chat_types", [])
441501

442-
async def import_history(self, start_date, chats):
443-
if start_date:
444-
offset_date = datetime.strptime(start_date, "%Y-%m-%d")
445-
else:
446-
offset_date = None
447-
502+
async def import_history(self, start_date: datetime = None, chats=None):
448503
if chats:
449504
chats = await self.client.get_entity(TelegramReader.prepare_chats(chats))
450505
else:
@@ -453,12 +508,12 @@ async def import_history(self, start_date, chats):
453508
for chat in chats:
454509
display_name = get_display_name(chat)
455510

456-
if offset_date:
457-
logging.log(LOG_LEVEL_INFO, "Importing history for chat '{}' starting at {}".format(display_name, offset_date.strftime("%c")))
511+
if start_date:
512+
logging.log(LOG_LEVEL_INFO, "Importing history for chat '{}' starting at {}".format(display_name, start_date.strftime("%c")))
458513
else:
459514
logging.log(LOG_LEVEL_INFO, "Importing full history for chat '{}'".format(display_name))
460515

461-
async for message in self.client.iter_messages(chat, offset_date=offset_date, reverse=True):
516+
async for message in self.client.iter_messages(chat, offset_date=start_date, reverse=True):
462517
await self.output_handler.write_message(message, self.is_chat_enabled)
463518

464519
logging.log(LOG_LEVEL_INFO, "Import finished")
@@ -479,6 +534,31 @@ async def handler(event):
479534

480535
await self.client.catch_up()
481536

537+
async def periodic_import(self, config: dict):
538+
interval = TimeInterval.parse(config.get("interval", "1d"))
539+
540+
time_range = config.get("range")
541+
if time_range:
542+
time_range = TimeInterval.parse(time_range)
543+
time_range_string = time_range.format_human_readable()
544+
else:
545+
time_range = None
546+
time_range_string = "all"
547+
548+
logging.log(LOG_LEVEL_INFO, f"Scheduling periodic import (interval: {interval.format_human_readable()}, range: {time_range_string})")
549+
550+
while True:
551+
await asyncio.sleep(interval.seconds)
552+
553+
logging.log(LOG_LEVEL_INFO, "Starting periodic import")
554+
555+
start_date = None
556+
if time_range:
557+
start_date = datetime.now() - time_range.timedelta()
558+
await self.import_history(start_date)
559+
560+
logging.log(LOG_LEVEL_INFO, f"Periodic import completed, next periodic import at {datetime.now() + interval.timedelta()}")
561+
482562
def is_chat_enabled(self, chat, chat_types=None):
483563
if chat.id in self.additional_chats:
484564
return True
@@ -559,12 +639,20 @@ def main():
559639
loop = telegram_reader.client.loop
560640

561641
if arguments.command == "import-history":
562-
loop.run_until_complete(telegram_reader.import_history(arguments.start_date, arguments.chats))
642+
start_date = arguments.start_date
643+
if start_date:
644+
start_date = datetime.strptime(start_date, "%Y-%m-%d")
645+
646+
loop.run_until_complete(telegram_reader.import_history(start_date, arguments.chats))
563647
elif arguments.command == "list-chats":
564648
loop.run_until_complete(telegram_reader.list_chats(arguments.types))
565649
elif arguments.command == "listen":
566650
loop.create_task(telegram_reader.listen())
567651

652+
periodic_import_config = config.get("periodic_import", {})
653+
if periodic_import_config.get("interval") is not None:
654+
loop.create_task(telegram_reader.periodic_import(periodic_import_config))
655+
568656
try:
569657
loop.run_forever()
570658
except KeyboardInterrupt:

test_telegram2elastic.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import pytest
22

3-
from telegram2elastic import FileSize, DottedPathDict
3+
from telegram2elastic import FileSize, DottedPathDict, TimeInterval
44

55

66
class TestFileSize:
@@ -29,3 +29,25 @@ def test(self):
2929

3030
with pytest.raises(TypeError):
3131
dotted_path_dict.set("foo.bar.baz", "other value")
32+
33+
34+
class TestTimeInterval:
35+
def test_parse(self):
36+
assert TimeInterval.parse("1h1m").seconds == 60*60 + 60
37+
assert TimeInterval.parse("1h5m").seconds == 60*60 + 60*5
38+
assert TimeInterval.parse("1d").seconds == 60*60*24
39+
assert TimeInterval.parse("1d3h10m").seconds == 60*60*24 + 60*60*3 + 60*10
40+
assert TimeInterval.parse("1y2mo").seconds == 60*60*24*365 + 60*60*24*60
41+
assert TimeInterval.parse("1mo2m").seconds == 60*60*24*30 + 60*2
42+
assert TimeInterval.parse("2m1mo").seconds == 60*60*24*30 + 60*2
43+
44+
def test_format(self):
45+
assert TimeInterval(1).format_human_readable() == "1 second"
46+
assert TimeInterval(25).format_human_readable() == "25 seconds"
47+
assert TimeInterval(60).format_human_readable() == "1 minute"
48+
assert TimeInterval(90).format_human_readable() == "1 minute, 30 seconds"
49+
assert TimeInterval(60*2).format_human_readable() == "2 minutes"
50+
assert TimeInterval(60*60).format_human_readable() == "1 hour"
51+
assert TimeInterval(60*60*24).format_human_readable() == "1 day"
52+
assert TimeInterval(60*60*24*2).format_human_readable() == "2 days"
53+
assert TimeInterval(60*60*24 + 60*60*12 + 60 + 35).format_human_readable() == "1 day, 12 hours, 1 minute, 35 seconds"

0 commit comments

Comments
 (0)