Skip to content

Commit fd846fd

Browse files
authored
Merge pull request #1 from feedloop/feature/schedule-minute-hour-support
Feature/schedule minute hour support
2 parents 29a2931 + cf1d847 commit fd846fd

18 files changed

Lines changed: 169 additions & 105 deletions

Makefile

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ help:
1212
@echo "Available targets:"
1313
@echo " build Build the qhronosd binary"
1414
@echo " clean Remove the qhronosd binary"
15-
@echo " test Run the test script (requires docker-up)"
15+
@echo " test [name] Run the test script (requires docker-up). Optionally pass a test or subtest name, e.g. 'make test TestScheduler/successful scheduling'"
1616
@echo " migrate-up Run migrations up using scripts/migrate.sh"
1717
@echo " migrate-down Run migrations down using scripts/migrate.sh"
1818
@echo " migrate-clean-slate Drop and recreate the database, then run all migrations from scratch"
@@ -28,7 +28,7 @@ clean:
2828
rm -f $(BINARY_NAME)
2929

3030
test:
31-
bash scripts/test.sh
31+
bash scripts/test.sh $(filter-out $@,$(MAKECMDGOALS))
3232

3333
migrate-up:
3434
bash scripts/migrate.sh up
@@ -56,3 +56,6 @@ redis-cleanup:
5656

5757
docker-qup:
5858
docker compose up -d postgres redis qhronosd
59+
60+
%::
61+
@:

internal/handlers/event_handler_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ func TestEventHandler(t *testing.T) {
3030
db := testutils.TestDB(t)
3131
logger := zap.NewNop()
3232
redisClient := testutils.TestRedis(t)
33-
eventRepo := repository.NewEventRepository(db, logger, redisClient)
33+
// Use test namespace for Redis keys in tests
34+
namespace := testutils.GetRedisNamespace()
35+
eventRepo := repository.NewEventRepository(db, logger, redisClient, namespace)
3436
occurrenceRepo := repository.NewOccurrenceRepository(db, logger)
35-
schedulerService := scheduler.NewScheduler(redisClient, logger)
37+
schedulerService := scheduler.NewScheduler(redisClient, logger, namespace)
3638
expander := scheduler.NewExpander(
3739
schedulerService,
3840
eventRepo,

internal/handlers/occurrence_handler_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ func TestOccurrenceHandler(t *testing.T) {
2525
db := testutils.TestDB(t)
2626
logger := zap.NewNop()
2727
redisClient := testutils.TestRedis(t)
28-
eventRepo := repository.NewEventRepository(db, logger, redisClient)
28+
namespace := testutils.GetRedisNamespace()
29+
eventRepo := repository.NewEventRepository(db, logger, redisClient, namespace)
2930
occurrenceRepo := repository.NewOccurrenceRepository(db, logger)
3031
handler := NewOccurrenceHandler(eventRepo, occurrenceRepo)
3132

internal/integration/event_redis_cleanup_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ func TestEventRepository_RedisCleanupOnDelete(t *testing.T) {
2525
logger := zap.NewNop()
2626
// Use a real DB for event repo, but test Redis
2727
db := testutils.TestDB(t)
28-
repo := repository.NewEventRepository(db, logger, redisClient)
28+
namespace := testutils.GetRedisNamespace()
29+
repo := repository.NewEventRepository(db, logger, redisClient, namespace)
2930

3031
// Create event and schedule occurrences in Redis (inline logic)
3132
event := &models.Event{
@@ -65,17 +66,17 @@ func TestEventRepository_RedisCleanupOnDelete(t *testing.T) {
6566
require.NoError(t, err)
6667
key := "schedule:" + event.ID.String() + ":" + fmt.Sprintf("%d", occ.ScheduledAt.Unix())
6768
score := float64(occ.ScheduledAt.UnixMilli())
68-
_, err = redisClient.ZAdd(ctx, "schedules", redis.Z{
69+
_, err = redisClient.ZAdd(ctx, namespace+"schedules", redis.Z{
6970
Score: score,
7071
Member: key,
7172
}).Result()
7273
require.NoError(t, err)
73-
_, err = redisClient.HSet(ctx, "schedule:data", key, data).Result()
74+
_, err = redisClient.HSet(ctx, namespace+"schedule:data", key, data).Result()
7475
require.NoError(t, err)
7576
}
7677

7778
// Ensure occurrences are in Redis
78-
results, err := redisClient.ZRange(ctx, "schedules", 0, -1).Result()
79+
results, err := redisClient.ZRange(ctx, namespace+"schedules", 0, -1).Result()
7980
require.NoError(t, err)
8081
assert.NotEmpty(t, results)
8182

@@ -84,11 +85,11 @@ func TestEventRepository_RedisCleanupOnDelete(t *testing.T) {
8485
require.NoError(t, err)
8586

8687
// Ensure occurrences for this event are removed from Redis
87-
results, err = redisClient.ZRange(ctx, "schedules", 0, -1).Result()
88+
results, err = redisClient.ZRange(ctx, namespace+"schedules", 0, -1).Result()
8889
require.NoError(t, err)
8990
for _, res := range results {
9091
// Fetch from hash instead of decoding directly
91-
data, err := redisClient.HGet(ctx, "schedule:data", res).Result()
92+
data, err := redisClient.HGet(ctx, namespace+"schedule:data", res).Result()
9293
if err != nil {
9394
continue // skip if not found
9495
}

internal/repository/event_repository.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ import (
1616
)
1717

1818
type EventRepository struct {
19-
db *sqlx.DB
20-
logger *zap.Logger
21-
redis *redis.Client
19+
db *sqlx.DB
20+
logger *zap.Logger
21+
redis *redis.Client
22+
redisPrefix string
2223
}
2324

24-
func NewEventRepository(db *sqlx.DB, logger *zap.Logger, redis *redis.Client) *EventRepository {
25-
return &EventRepository{db: db, logger: logger, redis: redis}
25+
func NewEventRepository(db *sqlx.DB, logger *zap.Logger, redis *redis.Client, prefix string) *EventRepository {
26+
return &EventRepository{db: db, logger: logger, redis: redis, redisPrefix: prefix}
2627
}
2728

2829
func timePtr(t time.Time) *time.Time {
@@ -174,13 +175,13 @@ func (r *EventRepository) Delete(ctx context.Context, id uuid.UUID) error {
174175

175176
// RemoveEventOccurrencesFromRedis removes all scheduled occurrences for an event from Redis
176177
func (r *EventRepository) RemoveEventOccurrencesFromRedis(ctx context.Context, eventID uuid.UUID) error {
177-
results, err := r.redis.ZRange(ctx, "schedules", 0, -1).Result()
178+
results, err := r.redis.ZRange(ctx, r.redisPrefix+"schedules", 0, -1).Result()
178179
if err != nil {
179180
return err
180181
}
181182
for _, key := range results {
182183
// Fetch the schedule data from the hash
183-
data, err := r.redis.HGet(ctx, "schedule:data", key).Result()
184+
data, err := r.redis.HGet(ctx, r.redisPrefix+"schedule:data", key).Result()
184185
if err != nil {
185186
continue // skip if not found or error
186187
}
@@ -192,8 +193,8 @@ func (r *EventRepository) RemoveEventOccurrencesFromRedis(ctx context.Context, e
192193
}
193194
if sched.EventID == eventID {
194195
// Remove from both sorted set and hash
195-
r.redis.ZRem(ctx, "schedules", key)
196-
r.redis.HDel(ctx, "schedule:data", key)
196+
r.redis.ZRem(ctx, r.redisPrefix+"schedules", key)
197+
r.redis.HDel(ctx, r.redisPrefix+"schedule:data", key)
197198
}
198199
}
199200
return nil

internal/repository/event_repository_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ func TestEventRepository(t *testing.T) {
3232
db := testutils.TestDB(t)
3333
logger := zap.NewNop()
3434
redisClient := testutils.TestRedis(t)
35-
repo := NewEventRepository(db, logger, redisClient)
35+
namespace := testutils.GetRedisNamespace()
36+
repo := NewEventRepository(db, logger, redisClient, namespace)
3637

3738
// Add cleanup function
3839
cleanup := func() {
@@ -398,7 +399,8 @@ func TestDeleteOldEvents(t *testing.T) {
398399

399400
logger := zap.NewNop()
400401
redisClient := testutils.TestRedis(t)
401-
repo := NewEventRepository(db, logger, redisClient)
402+
namespace := testutils.GetRedisNamespace()
403+
repo := NewEventRepository(db, logger, redisClient, namespace)
402404
ctx := context.Background()
403405

404406
// Create test data
@@ -467,7 +469,8 @@ func TestDeleteOldOccurrences(t *testing.T) {
467469

468470
logger := zap.NewNop()
469471
redisClient := testutils.TestRedis(t)
470-
repo := NewEventRepository(db, logger, redisClient)
472+
namespace := testutils.GetRedisNamespace()
473+
repo := NewEventRepository(db, logger, redisClient, namespace)
471474
ctx := context.Background()
472475

473476
// Create test event
@@ -544,6 +547,7 @@ func TestArchiveOldData(t *testing.T) {
544547

545548
logger := zap.NewNop()
546549
redisClient := testutils.TestRedis(t)
550+
namespace := testutils.GetRedisNamespace()
547551

548552
// Insert an old event and occurrence
549553
oldEvent := &models.Event{
@@ -558,7 +562,7 @@ func TestArchiveOldData(t *testing.T) {
558562
Status: models.EventStatusActive,
559563
CreatedAt: time.Now().Add(-48 * time.Hour),
560564
}
561-
eventRepo := NewEventRepository(db, logger, redisClient)
565+
eventRepo := NewEventRepository(db, logger, redisClient, namespace)
562566
err := eventRepo.Create(ctx, oldEvent)
563567
require.NoError(t, err)
564568

internal/repository/occurrence_repository_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ func TestOccurrenceRepository(t *testing.T) {
1818
db := testutils.TestDB(t)
1919
logger := zap.NewNop()
2020
redisClient := testutils.TestRedis(t)
21-
eventRepo := NewEventRepository(db, logger, redisClient)
21+
namespace := testutils.GetRedisNamespace()
22+
eventRepo := NewEventRepository(db, logger, redisClient, namespace)
2223
repo := NewOccurrenceRepository(db, logger)
2324

2425
cleanup := func() {

internal/scheduler/archiver_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ func TestArchivalScheduler(t *testing.T) {
4141
Status: models.EventStatusActive,
4242
CreatedAt: time.Now().Add(-48 * time.Hour),
4343
}
44-
eventRepo := repository.NewEventRepository(db, logger, redisClient)
44+
namespace := testutils.GetRedisNamespace()
45+
eventRepo := repository.NewEventRepository(db, logger, redisClient, namespace)
4546
err = eventRepo.Create(ctx, oldEvent)
4647
require.NoError(t, err)
4748

internal/scheduler/dispatcher.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type Dispatcher struct {
3535
logger *zap.Logger
3636
clientNotifier ClientNotifier // optional, for q: webhooks
3737
scheduler *Scheduler // new field for scheduler
38+
redisPrefix string // added redisPrefix field
3839
}
3940

4041
// HTTPClient interface for mocking HTTP requests
@@ -62,6 +63,7 @@ func NewDispatcher(eventRepo *repository.EventRepository, occurrenceRepo *reposi
6263
logger: logger,
6364
clientNotifier: clientNotifier,
6465
scheduler: scheduler,
66+
redisPrefix: scheduler.redisPrefix,
6567
}
6668
}
6769

@@ -77,8 +79,8 @@ func (d *Dispatcher) DispatchWebhook(ctx context.Context, sched *models.Schedule
7779
if err != nil || event == nil {
7880
key := fmt.Sprintf("schedule:%s:%d", sched.EventID.String(), sched.ScheduledAt.Unix())
7981
if d.scheduler != nil && d.scheduler.redis != nil {
80-
_, zremErr := d.scheduler.redis.ZRem(ctx, ScheduleKey, key).Result()
81-
_, hdelErr := d.scheduler.redis.HDel(ctx, "schedule:data", key).Result()
82+
_, zremErr := d.scheduler.redis.ZRem(ctx, d.scheduler.redisPrefix+ScheduleKey, key).Result()
83+
_, hdelErr := d.scheduler.redis.HDel(ctx, d.scheduler.redisPrefix+"schedule:data", key).Result()
8284
d.logger.Warn("Orphaned schedule found and removed",
8385
zap.String("event_id", sched.EventID.String()),
8486
zap.String("schedule_key", key),
@@ -245,7 +247,7 @@ return due
245247
case <-ticker.C:
246248
now := fmt.Sprintf("%f", float64(time.Now().Unix()))
247249
// Use Lua script for atomic move
248-
res, err := scheduler.redis.Eval(ctx, retryLua, []string{retryQueueKey, dispatchQueueKey}, now).Result()
250+
res, err := scheduler.redis.Eval(ctx, retryLua, []string{d.scheduler.redisPrefix + retryQueueKey, d.scheduler.redisPrefix + dispatchQueueKey}, now).Result()
249251
if err != nil {
250252
d.logger.Error("[RETRY POLLER] Lua script failed", zap.Error(err))
251253
continue
@@ -267,7 +269,7 @@ return due
267269
d.logger.Debug("[DISPATCHER] Worker waiting for item", zap.Int("worker_id", workerID), zap.Time("ts", itemStart))
268270
// Pop from dispatch queue (no processing queue)
269271
popStart := time.Now()
270-
data, err := scheduler.redis.BRPop(ctx, 5*time.Second, dispatchQueueKey).Result()
272+
data, err := scheduler.redis.BRPop(ctx, 5*time.Second, d.scheduler.redisPrefix+dispatchQueueKey).Result()
271273
popEnd := time.Now()
272274
d.logger.Debug("[DISPATCHER] BRPop duration", zap.Int("worker_id", workerID), zap.Duration("duration", popEnd.Sub(popStart)), zap.Error(err))
273275
if err == redis.Nil {
@@ -312,7 +314,7 @@ return due
312314
nextRetry := time.Now().Add(d.retryDelay).Unix()
313315
updatedData, _ := json.Marshal(sched)
314316
d.logger.Debug("[DISPATCHER] Before ZAdd to retry queue", zap.Int("worker_id", workerID), zap.Time("ts", zaddStart))
315-
err := scheduler.redis.ZAdd(ctx, retryQueueKey, redis.Z{
317+
err := scheduler.redis.ZAdd(ctx, d.scheduler.redisPrefix+retryQueueKey, redis.Z{
316318
Score: float64(nextRetry),
317319
Member: updatedData,
318320
}).Err()

0 commit comments

Comments
 (0)