Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions internal/gateway/methods/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,16 @@ func (m *HeartbeatMethods) handleGet(ctx context.Context, client *gateway.Client
func (m *HeartbeatMethods) handleSet(ctx context.Context, client *gateway.Client, req *protocol.RequestFrame) {
locale := store.LocaleFromContext(ctx)
var params struct {
AgentID string `json:"agentId"`
Enabled *bool `json:"enabled"`
IntervalSec *int `json:"intervalSec"`
Prompt *string `json:"prompt"`
ProviderName *string `json:"providerName"`
Model *string `json:"model"`
IsolatedSession *bool `json:"isolatedSession"`
LightContext *bool `json:"lightContext"`
AckMaxChars *int `json:"ackMaxChars"`
MaxRetries *int `json:"maxRetries"`
AgentID string `json:"agentId"`
Enabled *bool `json:"enabled"`
IntervalSec *int `json:"intervalSec"`
Prompt *string `json:"prompt"`
ProviderName *string `json:"providerName"`
Model *string `json:"model"`
IsolatedSession *bool `json:"isolatedSession"`
LightContext *bool `json:"lightContext"`
AckMaxChars *int `json:"ackMaxChars"`
MaxRetries *int `json:"maxRetries"`
ActiveHoursStart *string `json:"activeHoursStart"`
ActiveHoursEnd *string `json:"activeHoursEnd"`
Timezone *string `json:"timezone"`
Expand Down Expand Up @@ -214,7 +214,8 @@ func (m *HeartbeatMethods) handleSet(ctx context.Context, client *gateway.Client
}

if hb.Enabled && hb.NextRunAt == nil {
nextRun := time.Now().Add(time.Duration(hb.IntervalSec)*time.Second + store.StaggerOffset(hb.AgentID, hb.IntervalSec))
anchor := hb.LastRunAt
nextRun := store.NextHeartbeatRunAt(time.Now(), hb.AgentID, hb.IntervalSec, anchor)
hb.NextRunAt = &nextRun
}

Expand Down Expand Up @@ -260,7 +261,8 @@ func (m *HeartbeatMethods) handleToggle(ctx context.Context, client *gateway.Cli

hb.Enabled = params.Enabled
if params.Enabled && hb.NextRunAt == nil {
nextRun := time.Now().Add(time.Duration(hb.IntervalSec) * time.Second)
anchor := hb.LastRunAt
nextRun := store.NextHeartbeatRunAt(time.Now(), hb.AgentID, hb.IntervalSec, anchor)
hb.NextRunAt = &nextRun
}

Expand Down Expand Up @@ -415,8 +417,8 @@ func (m *HeartbeatMethods) handleChecklistSet(ctx context.Context, client *gatew
}

client.SendResponse(protocol.NewOKResponse(req.ID, map[string]any{
"ok": true,
"length": len([]rune(params.Content)),
"ok": true,
"length": len([]rune(params.Content)),
}))
emitAudit(m.eventBus, client, "heartbeat.checklist.set", "heartbeat", params.AgentID)
}
Expand Down
17 changes: 13 additions & 4 deletions internal/heartbeat/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func NewTicker(cfg TickerConfig) *Ticker {
msgBus: cfg.MsgBus,
sched: cfg.Sched,
runAgent: cfg.RunAgent,
wakeCh: make(chan uuid.UUID, 16),
stopCh: make(chan struct{}),
wakeCh: make(chan uuid.UUID, 16),
stopCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -369,7 +369,11 @@ func (t *Ticker) finishRun(ctx context.Context, hb store.AgentHeartbeat, session
if errMsg != "" {
newState.LastError = errMsg
}
nextRun := now.Add(time.Duration(hb.IntervalSec) * time.Second)
anchor := hb.NextRunAt
if anchor == nil {
anchor = hb.LastRunAt
}
nextRun := store.NextHeartbeatRunAt(now, hb.AgentID, hb.IntervalSec, anchor)
newState.NextRunAt = &nextRun

if err := t.store.UpdateState(ctx, hb.ID, newState); err != nil {
Expand Down Expand Up @@ -415,7 +419,12 @@ func (t *Ticker) logSkipped(ctx context.Context, hb store.AgentHeartbeat, reason
}

func (t *Ticker) advanceNextRun(ctx context.Context, hb store.AgentHeartbeat) {
nextRun := time.Now().Add(time.Duration(hb.IntervalSec) * time.Second)
now := time.Now()
anchor := hb.NextRunAt
if anchor == nil {
anchor = hb.LastRunAt
}
nextRun := store.NextHeartbeatRunAt(now, hb.AgentID, hb.IntervalSec, anchor)
state := store.HeartbeatState{
NextRunAt: &nextRun,
LastStatus: deref(hb.LastStatus),
Expand Down
20 changes: 20 additions & 0 deletions internal/store/heartbeat_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,26 @@ func StaggerOffset(agentID uuid.UUID, intervalSec int) time.Duration {
return time.Duration(offset) * time.Second
}

// NextHeartbeatRunAt returns the next stable-phase run time for a heartbeat.
// The first run uses the deterministic stagger so agents fan out, and later
// runs advance from the previous scheduled slot so late completions do not drift.
func NextHeartbeatRunAt(now time.Time, agentID uuid.UUID, intervalSec int, anchor *time.Time) time.Time {
if intervalSec <= 0 {
return now
}

interval := time.Duration(intervalSec) * time.Second
if anchor == nil || anchor.IsZero() {
return now.Add(interval + StaggerOffset(agentID, intervalSec))
}

next := anchor.Add(interval)
for !next.After(now) {
next = next.Add(interval)
}
return next
}

// HeartbeatEvent represents a heartbeat lifecycle event sent to subscribers.
type HeartbeatEvent struct {
Action string `json:"action" db:"-"` // "running", "completed", "suppressed", "error", "skipped"
Expand Down
49 changes: 49 additions & 0 deletions internal/store/heartbeat_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package store

import (
"testing"
"time"

"github.com/google/uuid"
)

func TestNextHeartbeatRunAt_InitialScheduleUsesDeterministicStagger(t *testing.T) {
now := time.Date(2026, time.March, 28, 12, 0, 0, 0, time.UTC)
agentID := uuid.MustParse("11111111-1111-1111-1111-111111111111")
intervalSec := 1800

want := now.Add(time.Duration(intervalSec)*time.Second + StaggerOffset(agentID, intervalSec))
got := NextHeartbeatRunAt(now, agentID, intervalSec, nil)

if !got.Equal(want) {
t.Fatalf("NextHeartbeatRunAt initial schedule = %v, want %v", got, want)
}
}

func TestNextHeartbeatRunAt_AdvancesFromAnchorWithoutDrift(t *testing.T) {
now := time.Date(2026, time.March, 28, 12, 0, 0, 0, time.UTC)
agentID := uuid.MustParse("22222222-2222-2222-2222-222222222222")
intervalSec := 300
anchor := now.Add(-7 * time.Minute)

want := now.Add(3 * time.Minute)
got := NextHeartbeatRunAt(now, agentID, intervalSec, &anchor)

if !got.Equal(want) {
t.Fatalf("NextHeartbeatRunAt anchored schedule = %v, want %v", got, want)
}
}

func TestNextHeartbeatRunAt_SkipsMissedIntervalsToFuturePhase(t *testing.T) {
now := time.Date(2026, time.March, 28, 12, 0, 0, 0, time.UTC)
agentID := uuid.MustParse("33333333-3333-3333-3333-333333333333")
intervalSec := 300
anchor := now.Add(-20 * time.Minute)

want := now.Add(5 * time.Minute)
got := NextHeartbeatRunAt(now, agentID, intervalSec, &anchor)

if !got.Equal(want) {
t.Fatalf("NextHeartbeatRunAt catch-up schedule = %v, want %v", got, want)
}
}
6 changes: 4 additions & 2 deletions internal/tools/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ func (t *HeartbeatTool) handleSet(ctx context.Context, agentID uuid.UUID, args m
}

if hb.Enabled && hb.NextRunAt == nil {
nextRun := time.Now().Add(time.Duration(hb.IntervalSec)*time.Second + store.StaggerOffset(hb.AgentID, hb.IntervalSec))
anchor := hb.LastRunAt
nextRun := store.NextHeartbeatRunAt(time.Now(), hb.AgentID, hb.IntervalSec, anchor)
hb.NextRunAt = &nextRun
}

Expand All @@ -253,7 +254,8 @@ func (t *HeartbeatTool) handleToggle(ctx context.Context, agentID uuid.UUID, ena
}
hb.Enabled = enabled
if enabled && hb.NextRunAt == nil {
nextRun := time.Now().Add(time.Duration(hb.IntervalSec) * time.Second)
anchor := hb.LastRunAt
nextRun := store.NextHeartbeatRunAt(time.Now(), hb.AgentID, hb.IntervalSec, anchor)
hb.NextRunAt = &nextRun
}
if err := t.hbStore.Upsert(ctx, hb); err != nil {
Expand Down
Loading