Skip to content

Commit 6ad3499

Browse files
committed
Add heartbeat module
1 parent 3f54164 commit 6ad3499

3 files changed

Lines changed: 302 additions & 16 deletions

File tree

src/agent.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,37 @@ pub async fn process_message(
197197
Ok(final_content)
198198
}
199199

200+
// ---------------------------------------------------------------------------
201+
// Heartbeat agent entry point (one-shot, no session)
202+
// ---------------------------------------------------------------------------
203+
204+
/// One-shot run for heartbeat: same context as `process_message` but with empty
205+
/// history and summary. No session load or save.
206+
pub async fn process_heartbeat_message(
207+
llm: &HttpProvider,
208+
registry: &ToolRegistry,
209+
workspace_path: &Path,
210+
model: &str,
211+
chat_id: &str,
212+
user_message: &str,
213+
tool_ctx: &ToolCtx,
214+
) -> Result<String, AgentError> {
215+
let skills_summary = skills::build_skills_summary(workspace_path)?;
216+
let tool_summaries = registry.summaries();
217+
let today = crate::workspace::today_yyyymmdd();
218+
let messages = build_messages(
219+
workspace_path,
220+
&[],
221+
"",
222+
user_message,
223+
Some(chat_id),
224+
&skills_summary,
225+
&tool_summaries,
226+
Some(&today),
227+
);
228+
run_agent_loop(llm, registry, messages, tool_ctx, model, MAX_ITERATIONS).await
229+
}
230+
200231
// ---------------------------------------------------------------------------
201232
// Subagent runner (background; called by SubagentManager::spawn)
202233
// ---------------------------------------------------------------------------

src/heartbeat.rs

Lines changed: 202 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,202 @@
1-
//! Timer loop: read workspace/HEARTBEAT.md, push content as one-shot user message to agent.
1+
//! Timer loop: read workspace/HEARTBEAT.md, push one InboundMsg per task to agent.
2+
//!
3+
//! Each markdown bullet (`- `) in HEARTBEAT.md becomes its own agent run (one-shot, no session).
4+
//! Heartbeat pushes onto the same `inbound_tx` as Telegram and cron; the main loop branches on
5+
//! `channel == "heartbeat"` to call `process_heartbeat_message` instead of `process_message`.
6+
7+
use std::path::{Path, PathBuf};
8+
use std::sync::atomic::{AtomicI64, Ordering};
9+
use std::sync::Arc;
10+
use std::time::Duration;
11+
12+
use tokio::sync::mpsc;
13+
14+
use crate::telegram::InboundMsg;
15+
16+
/// Parse markdown bullet tasks from HEARTBEAT.md content.
17+
///
18+
/// Lines whose trimmed form starts with `"- "` are tasks; everything else is ignored.
19+
/// Inner whitespace around the task text is trimmed; blank tasks are dropped.
20+
pub fn parse_tasks(content: &str) -> Vec<String> {
21+
content
22+
.lines()
23+
.filter_map(|line| {
24+
line.trim()
25+
.strip_prefix("- ")
26+
.map(|task| task.trim().to_string())
27+
})
28+
.filter(|task| !task.is_empty())
29+
.collect()
30+
}
31+
32+
/// Read and parse tasks from `workspace/HEARTBEAT.md`.
33+
///
34+
/// Returns an empty vec if the file does not exist or cannot be read.
35+
/// Sync I/O is fine: this is called at most once per N-minute tick.
36+
fn read_tasks(workspace: &Path) -> Vec<String> {
37+
let path = workspace.join("HEARTBEAT.md");
38+
if !path.exists() {
39+
return vec![];
40+
}
41+
let content = std::fs::read_to_string(&path).unwrap_or_default();
42+
parse_tasks(&content)
43+
}
44+
45+
/// Spawn the heartbeat runner.
46+
///
47+
/// Every `interval_minutes` minutes: read `HEARTBEAT.md`, and for each task push one
48+
/// `InboundMsg { channel: "heartbeat" }` onto `inbound_tx`. The main loop will call
49+
/// `process_heartbeat_message` once per message — N agent calls per tick (N = tasks).
50+
///
51+
/// `last_chat_id` is loaded on each tick to find the current active Telegram chat.
52+
/// If it is `0` (no user has messaged yet) the messages are still pushed; main.rs
53+
/// drops the reply in that case.
54+
///
55+
/// # Panics
56+
/// Panics if `interval_minutes == 0` (caller must check before calling).
57+
pub fn spawn_heartbeat_runner(
58+
workspace: PathBuf,
59+
interval_minutes: u64,
60+
inbound_tx: mpsc::Sender<InboundMsg>,
61+
last_chat_id: Arc<AtomicI64>,
62+
) -> tokio::task::JoinHandle<()> {
63+
assert!(interval_minutes >= 1, "heartbeat interval_minutes must be >= 1");
64+
tokio::spawn(async move {
65+
let mut interval = tokio::time::interval(Duration::from_secs(interval_minutes * 60));
66+
// Skip the immediately-firing first tick so the first real tick is one full interval out.
67+
interval.tick().await;
68+
loop {
69+
interval.tick().await;
70+
let tasks = read_tasks(&workspace);
71+
if tasks.is_empty() {
72+
continue;
73+
}
74+
let chat_id = last_chat_id.load(Ordering::Relaxed);
75+
for task in tasks {
76+
let msg = InboundMsg {
77+
chat_id,
78+
user_id: 0,
79+
text: format!("[Heartbeat Task] {task}"),
80+
channel: "heartbeat".to_string(),
81+
};
82+
if inbound_tx.send(msg).await.is_err() {
83+
// Receiver closed (main loop exited); nothing more to do.
84+
return;
85+
}
86+
}
87+
}
88+
})
89+
}
90+
91+
#[cfg(test)]
92+
mod tests {
93+
use super::*;
94+
95+
// --- parse_tasks ---
96+
97+
#[test]
98+
fn parse_empty() {
99+
assert!(parse_tasks("").is_empty());
100+
}
101+
102+
#[test]
103+
fn parse_single_bullet() {
104+
assert_eq!(parse_tasks("- Check weather"), ["Check weather"]);
105+
}
106+
107+
#[test]
108+
fn parse_multiple_bullets() {
109+
let tasks = parse_tasks("- First\n- Second\n- Third");
110+
assert_eq!(tasks, ["First", "Second", "Third"]);
111+
}
112+
113+
#[test]
114+
fn parse_skips_non_bullet_lines() {
115+
let content = "# Heartbeat\n\nProse.\n\n- Do thing\n<!-- comment -->\n- Another";
116+
assert_eq!(parse_tasks(content), ["Do thing", "Another"]);
117+
}
118+
119+
#[test]
120+
fn parse_unicode_task() {
121+
assert_eq!(parse_tasks("- こんにちは 🦀"), ["こんにちは 🦀"]);
122+
}
123+
124+
#[test]
125+
fn parse_strips_inner_whitespace() {
126+
assert_eq!(parse_tasks("- \t trim me \t"), ["trim me"]);
127+
}
128+
129+
#[test]
130+
fn parse_skips_empty_bullets() {
131+
// A bare "- " with nothing after it is dropped.
132+
assert_eq!(parse_tasks("- \n- real task"), ["real task"]);
133+
}
134+
135+
#[test]
136+
fn parse_mixed_indentation_ignored() {
137+
// Lines that do NOT start with "- " after trimming are skipped.
138+
let tasks = parse_tasks(" - indented\n- normal");
139+
assert_eq!(tasks, ["indented", "normal"]);
140+
}
141+
142+
// --- read_tasks ---
143+
144+
#[test]
145+
fn read_tasks_returns_empty_when_file_missing() {
146+
let dir = std::env::temp_dir().join("icrab_hb_no_file_test");
147+
// Ensure no HEARTBEAT.md exists in this temp dir.
148+
let _ = std::fs::remove_file(dir.join("HEARTBEAT.md"));
149+
assert!(read_tasks(&dir).is_empty());
150+
}
151+
152+
#[test]
153+
fn read_tasks_parses_file() {
154+
let dir = std::env::temp_dir().join("icrab_hb_read_test");
155+
std::fs::create_dir_all(&dir).unwrap();
156+
std::fs::write(dir.join("HEARTBEAT.md"), "- Alpha\n- Beta\n").unwrap();
157+
let tasks = read_tasks(&dir);
158+
assert_eq!(tasks, ["Alpha", "Beta"]);
159+
let _ = std::fs::remove_dir_all(&dir);
160+
}
161+
162+
// --- message format ---
163+
164+
#[tokio::test]
165+
async fn messages_have_correct_format_and_channel() {
166+
use std::sync::atomic::AtomicI64;
167+
use tokio::sync::mpsc;
168+
169+
let dir = std::env::temp_dir().join("icrab_hb_msg_fmt_test");
170+
std::fs::create_dir_all(&dir).unwrap();
171+
std::fs::write(dir.join("HEARTBEAT.md"), "- Task A\n- Task B\n").unwrap();
172+
173+
let last_chat_id = Arc::new(AtomicI64::new(42));
174+
let tasks = read_tasks(&dir);
175+
assert_eq!(tasks.len(), 2);
176+
177+
let (tx, mut rx) = mpsc::channel(8);
178+
let chat_id = last_chat_id.load(Ordering::Relaxed);
179+
for task in &tasks {
180+
tx.send(InboundMsg {
181+
chat_id,
182+
user_id: 0,
183+
text: format!("[Heartbeat Task] {task}"),
184+
channel: "heartbeat".to_string(),
185+
})
186+
.await
187+
.unwrap();
188+
}
189+
drop(tx);
190+
191+
let a = rx.recv().await.unwrap();
192+
assert_eq!(a.chat_id, 42);
193+
assert_eq!(a.text, "[Heartbeat Task] Task A");
194+
assert_eq!(a.channel, "heartbeat");
195+
assert_eq!(a.user_id, 0);
196+
197+
let b = rx.recv().await.unwrap();
198+
assert_eq!(b.text, "[Heartbeat Task] Task B");
199+
200+
let _ = std::fs::remove_dir_all(&dir);
201+
}
202+
}

src/main.rs

Lines changed: 69 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
//! Single binary: runs Telegram poller + agent loop. Config: `~/.icrab/config.toml` or env.
44
55
use std::path::PathBuf;
6+
use std::sync::atomic::{AtomicI64, Ordering};
67
use std::sync::Arc;
78

89
use tokio::sync::mpsc;
@@ -11,6 +12,7 @@ use icrab::agent;
1112
use icrab::agent::subagent_manager::SubagentManager;
1213
use icrab::config;
1314
use icrab::cron_runner;
15+
use icrab::heartbeat;
1416
use icrab::llm::HttpProvider;
1517
use icrab::telegram::{self, OutboundMsg};
1618
use icrab::tools;
@@ -83,9 +85,34 @@ async fn main() {
8385
60,
8486
);
8587
registry.register(CronTool::new(Arc::clone(&cron_store)));
88+
89+
// Track the last Telegram/cron chat_id so heartbeat replies go to the right chat.
90+
let last_chat_id: Arc<AtomicI64> = Arc::new(AtomicI64::new(0));
91+
92+
// Spawn heartbeat if configured with interval_minutes >= 1.
93+
let heartbeat_interval = cfg
94+
.heartbeat
95+
.as_ref()
96+
.and_then(|h| h.interval_minutes)
97+
.unwrap_or(0);
98+
if heartbeat_interval >= 1 {
99+
heartbeat::spawn_heartbeat_runner(
100+
workspace.clone(),
101+
heartbeat_interval,
102+
inbound_tx.clone(),
103+
Arc::clone(&last_chat_id),
104+
);
105+
eprintln!("heartbeat runner started (interval: {} min)", heartbeat_interval);
106+
}
107+
86108
drop(inbound_tx);
87109

88110
while let Some(msg) = inbound_rx.recv().await {
111+
// Update last_chat_id for non-heartbeat sources so replies go to the right place.
112+
if msg.channel != "heartbeat" {
113+
last_chat_id.store(msg.chat_id, Ordering::Relaxed);
114+
}
115+
89116
let tool_ctx = tools::ToolCtx {
90117
workspace: workspace.clone(),
91118
restrict_to_workspace: restrict,
@@ -94,23 +121,50 @@ async fn main() {
94121
outbound_tx: Some(Arc::new(outbound_tx.clone())),
95122
};
96123
let chat_id_str = msg.chat_id.to_string();
97-
let reply = match agent::process_message(
98-
&llm,
99-
&registry,
100-
&workspace,
101-
model,
102-
&chat_id_str,
103-
&msg.text,
104-
&tool_ctx,
105-
)
106-
.await
107-
{
108-
Ok(r) => r,
109-
Err(e) => {
110-
eprintln!("agent error: {}", e);
111-
format!("Error: {}.", e)
124+
125+
let reply = if msg.channel == "heartbeat" {
126+
match agent::process_heartbeat_message(
127+
&llm,
128+
&registry,
129+
&workspace,
130+
model,
131+
&chat_id_str,
132+
&msg.text,
133+
&tool_ctx,
134+
)
135+
.await
136+
{
137+
Ok(r) => r,
138+
Err(e) => {
139+
eprintln!("heartbeat agent error: {}", e);
140+
format!("Error: {}.", e)
141+
}
142+
}
143+
} else {
144+
match agent::process_message(
145+
&llm,
146+
&registry,
147+
&workspace,
148+
model,
149+
&chat_id_str,
150+
&msg.text,
151+
&tool_ctx,
152+
)
153+
.await
154+
{
155+
Ok(r) => r,
156+
Err(e) => {
157+
eprintln!("agent error: {}", e);
158+
format!("Error: {}.", e)
159+
}
112160
}
113161
};
162+
163+
// Heartbeat with no known chat (chat_id == 0): no user has messaged yet, drop reply.
164+
if msg.channel == "heartbeat" && msg.chat_id == 0 {
165+
continue;
166+
}
167+
114168
let _ = outbound_tx
115169
.send(OutboundMsg {
116170
chat_id: msg.chat_id,

0 commit comments

Comments
 (0)