Skip to content

Commit 7feb85a

Browse files
committed
refactor scheduler
1 parent 9f4ea7e commit 7feb85a

17 files changed

Lines changed: 443 additions & 152 deletions

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,7 @@ data/postgres/
3838
data/redis/
3939

4040
# Binaries directory
41-
bin/
41+
bin/
42+
43+
# Logs directory
44+
logs/

internal/handlers/event_handler_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ func timePtr(t time.Time) *time.Time {
2828
func TestEventHandler(t *testing.T) {
2929
db := testutils.TestDB(t)
3030
logger := zap.NewNop()
31-
eventRepo := repository.NewEventRepository(db, logger)
31+
redisClient := testutils.TestRedis(t)
32+
eventRepo := repository.NewEventRepository(db, logger, redisClient)
3233
handler := NewEventHandler(eventRepo)
3334

3435
cleanup := func() {

internal/handlers/occurrence_handler_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import (
2424
func TestOccurrenceHandler(t *testing.T) {
2525
db := testutils.TestDB(t)
2626
logger := zap.NewNop()
27-
eventRepo := repository.NewEventRepository(db, logger)
27+
redisClient := testutils.TestRedis(t)
28+
eventRepo := repository.NewEventRepository(db, logger, redisClient)
2829
occurrenceRepo := repository.NewOccurrenceRepository(db, logger)
2930
handler := NewOccurrenceHandler(eventRepo, occurrenceRepo)
3031

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package integration
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"testing"
7+
"time"
8+
9+
"github.com/feedloop/qhronos/internal/models"
10+
"github.com/feedloop/qhronos/internal/repository"
11+
"github.com/feedloop/qhronos/internal/testutils"
12+
"github.com/google/uuid"
13+
"github.com/lib/pq"
14+
"github.com/redis/go-redis/v9"
15+
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
"go.uber.org/zap"
18+
)
19+
20+
func TestEventRepository_RedisCleanupOnDelete(t *testing.T) {
21+
ctx := context.Background()
22+
redisClient := testutils.TestRedis(t)
23+
redisClient.FlushAll(ctx)
24+
logger := zap.NewNop()
25+
// Use a real DB for event repo, but test Redis
26+
db := testutils.TestDB(t)
27+
repo := repository.NewEventRepository(db, logger, redisClient)
28+
29+
// Create event and schedule occurrences in Redis (inline logic)
30+
event := &models.Event{
31+
ID: uuid.New(),
32+
Name: "Event for Redis Cleanup",
33+
Description: "Test",
34+
StartTime: time.Now(),
35+
WebhookURL: "http://example.com",
36+
Status: models.EventStatusActive,
37+
Metadata: []byte(`{"key": "value"}`),
38+
Tags: pq.StringArray{"test"},
39+
CreatedAt: time.Now(),
40+
}
41+
err := repo.Create(ctx, event)
42+
require.NoError(t, err)
43+
44+
occurrences := []*models.Occurrence{
45+
{
46+
OccurrenceID: uuid.New(),
47+
EventID: event.ID,
48+
ScheduledAt: time.Now().Add(1 * time.Hour),
49+
Status: models.OccurrenceStatusPending,
50+
AttemptCount: 0,
51+
Timestamp: time.Now(),
52+
},
53+
{
54+
OccurrenceID: uuid.New(),
55+
EventID: event.ID,
56+
ScheduledAt: time.Now().Add(2 * time.Hour),
57+
Status: models.OccurrenceStatusPending,
58+
AttemptCount: 0,
59+
Timestamp: time.Now(),
60+
},
61+
}
62+
for _, occ := range occurrences {
63+
data, err := json.Marshal(occ)
64+
require.NoError(t, err)
65+
score := float64(occ.ScheduledAt.UnixMilli())
66+
_, err = redisClient.ZAdd(ctx, "schedule:events", redis.Z{
67+
Score: score,
68+
Member: string(data),
69+
}).Result()
70+
require.NoError(t, err)
71+
}
72+
73+
// Ensure occurrences are in Redis
74+
results, err := redisClient.ZRange(ctx, "schedule:events", 0, -1).Result()
75+
require.NoError(t, err)
76+
assert.NotEmpty(t, results)
77+
78+
// Delete event
79+
err = repo.Delete(ctx, event.ID)
80+
require.NoError(t, err)
81+
82+
// Ensure occurrences for this event are removed from Redis
83+
results, err = redisClient.ZRange(ctx, "schedule:events", 0, -1).Result()
84+
require.NoError(t, err)
85+
for _, res := range results {
86+
var occ models.Occurrence
87+
err := json.Unmarshal([]byte(res), &occ)
88+
if err == nil {
89+
assert.NotEqual(t, event.ID, occ.EventID, "Occurrence for deleted event should be removed from Redis")
90+
}
91+
}
92+
}

internal/models/occurrence.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"time"
55

66
"github.com/google/uuid"
7+
"github.com/lib/pq"
8+
"gorm.io/datatypes"
79
)
810

911
type OccurrenceStatus string
@@ -45,3 +47,14 @@ type PaginatedResponse struct {
4547
Total int `json:"total"`
4648
} `json:"pagination"`
4749
}
50+
51+
// Schedule is used for storing scheduled events in Redis with all event fields (no prefix)
52+
type Schedule struct {
53+
Occurrence
54+
Name string `json:"name"`
55+
Description string `json:"description"`
56+
WebhookURL string `json:"webhook_url"`
57+
Metadata datatypes.JSON `json:"metadata"`
58+
Tags pq.StringArray `json:"tags"`
59+
// Add more event fields here if needed for dispatch
60+
}

internal/repository/event_repository.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,18 @@ import (
1111
"github.com/google/uuid"
1212
"github.com/jmoiron/sqlx"
1313
"github.com/lib/pq"
14+
"github.com/redis/go-redis/v9"
1415
"go.uber.org/zap"
1516
)
1617

1718
type EventRepository struct {
1819
db *sqlx.DB
1920
logger *zap.Logger
21+
redis *redis.Client
2022
}
2123

22-
func NewEventRepository(db *sqlx.DB, logger *zap.Logger) *EventRepository {
23-
return &EventRepository{db: db, logger: logger}
24+
func NewEventRepository(db *sqlx.DB, logger *zap.Logger, redis *redis.Client) *EventRepository {
25+
return &EventRepository{db: db, logger: logger, redis: redis}
2426
}
2527

2628
func timePtr(t time.Time) *time.Time {
@@ -161,6 +163,33 @@ func (r *EventRepository) Delete(ctx context.Context, id uuid.UUID) error {
161163
return fmt.Errorf("error deleting event: %w", err)
162164
}
163165

166+
// Remove all scheduled occurrences for this event from Redis
167+
err = r.removeEventOccurrencesFromRedis(ctx, id)
168+
if err != nil {
169+
r.logger.Warn("Failed to remove event occurrences from Redis", zap.String("event_id", id.String()), zap.Error(err))
170+
}
171+
172+
// Remove recurring event from Redis (if present)
173+
_ = r.redis.HDel(ctx, "recurring:events", id.String()).Err()
174+
175+
return nil
176+
}
177+
178+
// removeEventOccurrencesFromRedis removes all scheduled occurrences for an event from Redis
179+
func (r *EventRepository) removeEventOccurrencesFromRedis(ctx context.Context, eventID uuid.UUID) error {
180+
results, err := r.redis.ZRange(ctx, "schedule:events", 0, -1).Result()
181+
if err != nil {
182+
return err
183+
}
184+
for _, res := range results {
185+
var occ models.Occurrence
186+
if err := json.Unmarshal([]byte(res), &occ); err != nil {
187+
continue // skip invalid
188+
}
189+
if occ.EventID == eventID {
190+
r.redis.ZRem(ctx, "schedule:events", res)
191+
}
192+
}
164193
return nil
165194
}
166195

internal/repository/event_repository_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ func setupTestDB(t *testing.T) *sqlx.DB {
3131
func TestEventRepository(t *testing.T) {
3232
db := testutils.TestDB(t)
3333
logger := zap.NewNop()
34-
repo := NewEventRepository(db, logger)
34+
redisClient := testutils.TestRedis(t)
35+
repo := NewEventRepository(db, logger, redisClient)
3536

3637
// Add cleanup function
3738
cleanup := func() {
@@ -396,7 +397,8 @@ func TestDeleteOldEvents(t *testing.T) {
396397
defer db.Close()
397398

398399
logger := zap.NewNop()
399-
repo := NewEventRepository(db, logger)
400+
redisClient := testutils.TestRedis(t)
401+
repo := NewEventRepository(db, logger, redisClient)
400402
ctx := context.Background()
401403

402404
// Create test data
@@ -464,7 +466,8 @@ func TestDeleteOldOccurrences(t *testing.T) {
464466
defer db.Close()
465467

466468
logger := zap.NewNop()
467-
repo := NewEventRepository(db, logger)
469+
redisClient := testutils.TestRedis(t)
470+
repo := NewEventRepository(db, logger, redisClient)
468471
ctx := context.Background()
469472

470473
// Create test event
@@ -540,6 +543,7 @@ func TestArchiveOldData(t *testing.T) {
540543
ctx := context.Background()
541544

542545
logger := zap.NewNop()
546+
redisClient := testutils.TestRedis(t)
543547

544548
// Insert an old event and occurrence
545549
oldEvent := &models.Event{
@@ -554,7 +558,7 @@ func TestArchiveOldData(t *testing.T) {
554558
Status: models.EventStatusActive,
555559
CreatedAt: time.Now().Add(-48 * time.Hour),
556560
}
557-
eventRepo := NewEventRepository(db, logger)
561+
eventRepo := NewEventRepository(db, logger, redisClient)
558562
err := eventRepo.Create(ctx, oldEvent)
559563
require.NoError(t, err)
560564

internal/repository/occurrence_repository_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ import (
1717
func TestOccurrenceRepository(t *testing.T) {
1818
db := testutils.TestDB(t)
1919
logger := zap.NewNop()
20-
eventRepo := NewEventRepository(db, logger)
20+
redisClient := testutils.TestRedis(t)
21+
eventRepo := NewEventRepository(db, logger, redisClient)
2122
repo := NewOccurrenceRepository(db, logger)
2223

2324
cleanup := func() {

internal/scheduler/archiver_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
func TestArchivalScheduler(t *testing.T) {
2121
db := testutils.TestDB(t)
2222
logger := zap.NewNop()
23+
redisClient := testutils.TestRedis(t)
2324
ctx := context.Background()
2425

2526
// Clean up tables before test
@@ -39,7 +40,7 @@ func TestArchivalScheduler(t *testing.T) {
3940
Status: models.EventStatusActive,
4041
CreatedAt: time.Now().Add(-48 * time.Hour),
4142
}
42-
eventRepo := repository.NewEventRepository(db, logger)
43+
eventRepo := repository.NewEventRepository(db, logger, redisClient)
4344
err = eventRepo.Create(ctx, oldEvent)
4445
require.NoError(t, err)
4546

internal/scheduler/dispatcher.go

Lines changed: 23 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -75,25 +75,16 @@ func (d *Dispatcher) retryWithBackoff(ctx context.Context, operation func() erro
7575
return lastErr
7676
}
7777

78-
// DispatchWebhook sends a webhook request and handles retries
79-
func (d *Dispatcher) DispatchWebhook(ctx context.Context, occurrence *models.Occurrence, event *models.Event) error {
80-
// Verify event exists in database
81-
dbEvent, err := d.eventRepo.GetByID(ctx, event.ID)
82-
if err != nil {
83-
return fmt.Errorf("error getting event: %w", err)
84-
}
85-
if dbEvent == nil {
86-
return fmt.Errorf("event not found")
87-
}
88-
78+
// DispatchWebhook sends a webhook request and handles retries using a Schedule object
79+
func (d *Dispatcher) DispatchWebhook(ctx context.Context, sched *models.Schedule) error {
8980
// Prepare webhook payload with rich information
9081
payload := map[string]interface{}{
91-
"event_id": event.ID,
92-
"occurrence_id": occurrence.ID,
93-
"name": event.Name,
94-
"description": event.Description,
95-
"scheduled_at": occurrence.ScheduledAt,
96-
"metadata": event.Metadata,
82+
"event_id": sched.EventID,
83+
"occurrence_id": sched.OccurrenceID,
84+
"name": sched.Name,
85+
"description": sched.Description,
86+
"scheduled_at": sched.ScheduledAt,
87+
"metadata": sched.Metadata,
9788
}
9889

9990
// Convert payload to JSON
@@ -103,7 +94,7 @@ func (d *Dispatcher) DispatchWebhook(ctx context.Context, occurrence *models.Occ
10394
}
10495

10596
// Create base request (will be cloned for each attempt)
106-
baseReq, err := http.NewRequestWithContext(ctx, "POST", event.WebhookURL, nil)
97+
baseReq, err := http.NewRequestWithContext(ctx, "POST", sched.WebhookURL, nil)
10798
if err != nil {
10899
return fmt.Errorf("error creating request: %w", err)
109100
}
@@ -112,12 +103,10 @@ func (d *Dispatcher) DispatchWebhook(ctx context.Context, occurrence *models.Occ
112103
baseReq.Header.Set("Content-Type", "application/json")
113104
baseReq.Header.Set("Content-Length", fmt.Sprintf("%d", len(jsonPayload)))
114105

115-
// Sign request if HMAC is enabled
106+
// Sign request if HMAC is enabled (not supported if secret is not present)
116107
if d.hmacService != nil {
117-
var secret string
118-
if event.HMACSecret != nil {
119-
secret = *event.HMACSecret
120-
}
108+
// No HMACSecret in Schedule, so skip or use default
109+
secret := ""
121110
signature := d.hmacService.SignPayload(jsonPayload, secret)
122111
baseReq.Header.Set("X-Qhronos-Signature", signature)
123112
}
@@ -181,9 +170,9 @@ func (d *Dispatcher) DispatchWebhook(ctx context.Context, occurrence *models.Occ
181170

182171
// Log the result to Postgres as a new occurrence record (append-only, for history)
183172
logOccurrence := &models.Occurrence{
184-
OccurrenceID: occurrence.OccurrenceID,
185-
EventID: occurrence.EventID,
186-
ScheduledAt: occurrence.ScheduledAt,
173+
OccurrenceID: sched.OccurrenceID,
174+
EventID: sched.EventID,
175+
ScheduledAt: sched.ScheduledAt,
187176
Status: finalStatus,
188177
AttemptCount: attemptCount,
189178
Timestamp: lastAttempt,
@@ -196,7 +185,7 @@ func (d *Dispatcher) DispatchWebhook(ctx context.Context, occurrence *models.Occ
196185
return err
197186
}
198187

199-
// Run processes due events and dispatches webhooks
188+
// Run processes due schedules and dispatches webhooks
200189
func (d *Dispatcher) Run(ctx context.Context, scheduler *Scheduler) error {
201190
d.logger.Info("Starting dispatcher")
202191
ticker := time.NewTicker(1 * time.Second)
@@ -208,32 +197,18 @@ func (d *Dispatcher) Run(ctx context.Context, scheduler *Scheduler) error {
208197
d.logger.Info("Dispatcher shutting down")
209198
return ctx.Err()
210199
case <-ticker.C:
211-
// Get due occurrences
212-
occurrences, err := scheduler.GetDueOccurrence(ctx)
200+
// Get due schedules
201+
schedules, err := scheduler.GetDueSchedules(ctx)
213202
if err != nil {
214-
d.logger.Error("Error getting due occurrences", zap.Error(err))
203+
d.logger.Error("Error getting due schedules", zap.Error(err))
215204
continue
216205
}
217206

218-
// Process each occurrence
219-
for _, occurrence := range occurrences {
220-
event, err := d.eventRepo.GetByID(ctx, occurrence.EventID)
221-
if err != nil {
222-
d.logger.Error("Error getting event for occurrence",
223-
zap.Int("occurrence_id", occurrence.ID),
224-
zap.Error(err))
225-
continue
226-
}
227-
if event == nil {
228-
d.logger.Warn("Event not found for occurrence",
229-
zap.Int("occurrence_id", occurrence.ID))
230-
continue
231-
}
232-
233-
// Dispatch webhook
234-
if err := d.DispatchWebhook(ctx, occurrence, event); err != nil {
207+
// Process each schedule
208+
for _, sched := range schedules {
209+
if err := d.DispatchWebhook(ctx, sched); err != nil {
235210
d.logger.Error("Error dispatching webhook",
236-
zap.Int("occurrence_id", occurrence.ID),
211+
zap.String("occurrence_id", sched.OccurrenceID.String()),
237212
zap.Error(err))
238213
continue
239214
}

0 commit comments

Comments
 (0)