Skip to content

Commit d7e3694

Browse files
committed
fix dockerfile, fix archiver
1 parent 885051e commit d7e3694

6 files changed

Lines changed: 81 additions & 40 deletions

File tree

Dockerfile

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
# syntax=docker/dockerfile:1
22

33
# Build stage
4-
FROM golang:1.23 as builder
4+
FROM golang:1.23 AS builder
55
WORKDIR /app
66
COPY . .
7-
RUN go build -o qhronosd ./cmd
7+
RUN make
88

99
# Run stage
1010
FROM debian:bullseye-slim
1111
WORKDIR /app
12-
COPY --from=builder /app/qhronosd .
13-
COPY config.example.yaml ./config.yaml
12+
COPY --from=builder /app/bin/qhronosd .
13+
COPY config.example.yml ./config.yml
1414
EXPOSE 8080
15-
CMD ["./qhronosd"]
15+
CMD ["./qhronosd", "-c", "config.yml"]

internal/scheduler/archiver.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
package scheduler
22

33
import (
4-
"database/sql"
4+
"context"
55
"encoding/json"
66
"time"
77

88
"github.com/feedloop/qhronos/internal/config"
99
"github.com/jmoiron/sqlx"
10+
"github.com/redis/go-redis/v9"
1011
"go.uber.org/zap"
1112
)
1213

1314
const (
14-
archiveLockKey = 12345 // Arbitrary unique key for advisory lock
15+
archiveLockKey = 12345 // Arbitrary unique key for advisory lock
16+
lastArchivalTimeKey = "archiver:last_archival_time"
1517
)
1618

1719
func acquireArchiveLock(db *sqlx.DB) (bool, error) {
@@ -25,28 +27,32 @@ func releaseArchiveLock(db *sqlx.DB) error {
2527
return err
2628
}
2729

28-
func shouldArchive(db *sqlx.DB, checkPeriod time.Duration) (bool, error) {
29-
var lastTime time.Time
30-
err := db.Get(&lastTime, "SELECT value::timestamptz FROM system_config WHERE key = 'last_archival_time'")
31-
if err != nil {
32-
if err == sql.ErrNoRows {
33-
return true, nil // Never archived before
30+
func shouldArchiveRedis(ctx context.Context, rdb *redis.Client, checkPeriod time.Duration) (bool, error) {
31+
val, err := rdb.Get(ctx, lastArchivalTimeKey).Result()
32+
if err == redis.Nil {
33+
// Not set yet, set to now + checkPeriod
34+
nextTime := time.Now().Add(checkPeriod)
35+
err := rdb.Set(ctx, lastArchivalTimeKey, nextTime.Format(time.RFC3339), 0).Err()
36+
if err != nil {
37+
return false, err
3438
}
39+
return true, nil // Should archive now
40+
} else if err != nil {
41+
return false, err
42+
}
43+
lastTime, err := time.Parse(time.RFC3339, val)
44+
if err != nil {
3545
return false, err
3646
}
37-
if time.Since(lastTime) < checkPeriod {
47+
if time.Now().Before(lastTime) {
3848
return false, nil
3949
}
4050
return true, nil
4151
}
4252

43-
func updateLastArchivalTime(db *sqlx.DB) error {
44-
_, err := db.Exec(`
45-
INSERT INTO system_config (key, value, description, updated_by)
46-
VALUES ('last_archival_time', now(), 'Last archival run', 'system')
47-
ON CONFLICT (key) DO UPDATE SET value = now(), updated_at = now(), updated_by = 'system'
48-
`)
49-
return err
53+
func updateLastArchivalTimeRedis(ctx context.Context, rdb *redis.Client, checkPeriod time.Duration) error {
54+
nextTime := time.Now().Add(checkPeriod)
55+
return rdb.Set(ctx, lastArchivalTimeKey, nextTime.Format(time.RFC3339), 0).Err()
5056
}
5157

5258
func syncRetentionConfigToDB(db *sqlx.DB, durations config.RetentionDurations) error {
@@ -70,12 +76,13 @@ func syncRetentionConfigToDB(db *sqlx.DB, durations config.RetentionDurations) e
7076
return err
7177
}
7278

73-
func StartArchivalScheduler(db *sqlx.DB, checkPeriod time.Duration, durations config.RetentionDurations, stopCh <-chan struct{}, logger *zap.Logger) {
79+
func StartArchivalScheduler(db *sqlx.DB, rdb *redis.Client, checkPeriod time.Duration, durations config.RetentionDurations, stopCh <-chan struct{}, logger *zap.Logger) {
7480
// Sync retention config on startup
7581
if err := syncRetentionConfigToDB(db, durations); err != nil {
7682
logger.Error("[archiver] Failed to sync retention config", zap.Error(err))
7783
}
7884
ticker := time.NewTicker(checkPeriod)
85+
ctx := context.Background()
7986
go func() {
8087
defer ticker.Stop()
8188
for {
@@ -90,7 +97,7 @@ func StartArchivalScheduler(db *sqlx.DB, checkPeriod time.Duration, durations co
9097
logger.Debug("[archiver] Another instance is archiving, skipping this cycle.")
9198
continue
9299
}
93-
shouldRun, err := shouldArchive(db, checkPeriod)
100+
shouldRun, err := shouldArchiveRedis(ctx, rdb, checkPeriod)
94101
if err != nil {
95102
logger.Error("[archiver] Error checking last archival time", zap.Error(err))
96103
releaseArchiveLock(db)
@@ -107,7 +114,7 @@ func StartArchivalScheduler(db *sqlx.DB, checkPeriod time.Duration, durations co
107114
releaseArchiveLock(db)
108115
continue
109116
}
110-
if err := updateLastArchivalTime(db); err != nil {
117+
if err := updateLastArchivalTimeRedis(ctx, rdb, checkPeriod); err != nil {
111118
logger.Error("[archiver] Error updating last archival time", zap.Error(err))
112119
}
113120
logger.Info("[archiver] Archival completed successfully.")

internal/scheduler/archiver_test.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/feedloop/qhronos/internal/testutils"
1212
"github.com/google/uuid"
1313
"github.com/lib/pq"
14+
"github.com/redis/go-redis/v9"
1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
1617
"go.uber.org/zap"
@@ -71,7 +72,12 @@ func TestArchivalScheduler(t *testing.T) {
7172
durations, err := (&config.Config{Retention: retention}).ParseRetentionDurations()
7273
require.NoError(t, err)
7374
checkPeriod := 2 * time.Second
74-
StartArchivalScheduler(db, checkPeriod, *durations, archivalStopCh, logger)
75+
76+
// Check that last_archival_time is not set in Redis initially
77+
val, err := redisClient.Get(ctx, "archiver:last_archival_time").Result()
78+
assert.ErrorIs(t, err, redis.Nil)
79+
80+
StartArchivalScheduler(db, redisClient, checkPeriod, *durations, archivalStopCh, logger)
7581

7682
// Also call the archival function directly for immediate effect
7783
_, err = db.ExecContext(ctx, "SELECT archive_old_data($1)", "24 hours")
@@ -81,6 +87,12 @@ func TestArchivalScheduler(t *testing.T) {
8187
time.Sleep(3 * time.Second)
8288
close(archivalStopCh)
8389

90+
// Check that last_archival_time is now set in Redis
91+
val, err = redisClient.Get(ctx, "archiver:last_archival_time").Result()
92+
assert.NoError(t, err)
93+
_, err = time.Parse(time.RFC3339, val)
94+
assert.NoError(t, err)
95+
8496
// Check that the old event and occurrence are gone from main tables
8597
e, err := eventRepo.GetByID(ctx, oldEvent.ID)
8698
assert.Nil(t, e)

internal/scheduler/dispatcher.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type Dispatcher struct {
3434
retryDelay time.Duration
3535
logger *zap.Logger
3636
clientNotifier ClientNotifier // optional, for q: webhooks
37+
scheduler *Scheduler // new field for scheduler
3738
}
3839

3940
// HTTPClient interface for mocking HTTP requests
@@ -50,7 +51,7 @@ func (d *DefaultHTTPClient) Do(req *http.Request) (*http.Response, error) {
5051
return d.client.Do(req)
5152
}
5253

53-
func NewDispatcher(eventRepo *repository.EventRepository, occurrenceRepo *repository.OccurrenceRepository, hmacService *services.HMACService, logger *zap.Logger, maxRetries int, retryDelay time.Duration, clientNotifier ClientNotifier) *Dispatcher {
54+
func NewDispatcher(eventRepo *repository.EventRepository, occurrenceRepo *repository.OccurrenceRepository, hmacService *services.HMACService, logger *zap.Logger, maxRetries int, retryDelay time.Duration, clientNotifier ClientNotifier, scheduler *Scheduler) *Dispatcher {
5455
return &Dispatcher{
5556
eventRepo: eventRepo,
5657
occurrenceRepo: occurrenceRepo,
@@ -60,6 +61,7 @@ func NewDispatcher(eventRepo *repository.EventRepository, occurrenceRepo *reposi
6061
retryDelay: retryDelay,
6162
logger: logger,
6263
clientNotifier: clientNotifier,
64+
scheduler: scheduler,
6365
}
6466
}
6567

@@ -70,6 +72,23 @@ func (d *Dispatcher) SetHTTPClient(client HTTPClient) {
7072

7173
// DispatchWebhook sends a webhook request and handles a single attempt (no in-process retry)
7274
func (d *Dispatcher) DispatchWebhook(ctx context.Context, sched *models.Schedule) error {
75+
// Defensive: Clean up orphaned schedule if event does not exist
76+
event, err := d.eventRepo.GetByID(ctx, sched.EventID)
77+
if err != nil || event == nil {
78+
key := fmt.Sprintf("schedule:%s:%d", sched.EventID.String(), sched.ScheduledAt.Unix())
79+
if d.scheduler != nil && d.scheduler.redis != nil {
80+
_, zremErr := d.scheduler.redis.ZRem(ctx, ScheduleKey, key).Result()
81+
_, hdelErr := d.scheduler.redis.HDel(ctx, "schedule:data", key).Result()
82+
d.logger.Warn("Orphaned schedule found and removed",
83+
zap.String("event_id", sched.EventID.String()),
84+
zap.String("schedule_key", key),
85+
zap.Error(err),
86+
zap.Error(zremErr),
87+
zap.Error(hdelErr),
88+
)
89+
}
90+
return fmt.Errorf("event not found: %s", sched.EventID)
91+
}
7392
if strings.HasPrefix(sched.Webhook, "q:") {
7493
if d.clientNotifier == nil {
7594
return fmt.Errorf("client notifier not configured for q: webhooks")
@@ -184,7 +203,7 @@ func (d *Dispatcher) DispatchWebhook(ctx context.Context, sched *models.Schedule
184203
}
185204
_ = d.occurrenceRepo.Create(ctx, logOccurrence)
186205
// Auto-inactivate one-time events after dispatch (success or max retries)
187-
event, err := d.eventRepo.GetByID(ctx, sched.EventID)
206+
event, err = d.eventRepo.GetByID(ctx, sched.EventID)
188207
if event == nil {
189208
return fmt.Errorf("event not found: %s", sched.EventID)
190209
}

internal/scheduler/dispatcher_test.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ func TestDispatcher(t *testing.T) {
8181
hmacService := services.NewHMACService("test-secret")
8282
mockHTTP := new(MockHTTPClient)
8383

84-
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil)
84+
scheduler := NewScheduler(redisClient, logger)
85+
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler)
8586
dispatcher.SetHTTPClient(mockHTTP)
8687

8788
// Add cleanup function
@@ -251,7 +252,7 @@ func TestDispatcher(t *testing.T) {
251252
dispatcherStart := time.Now()
252253
mockNotifier := NewMockClientNotifier()
253254
mockNotifier.connected["client1"] = []string{"c1"}
254-
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, mockNotifier)
255+
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, mockNotifier, scheduler)
255256
fmt.Printf("[PROFILE] dispatcher creation: %v\n", time.Since(dispatcherStart))
256257
scheduleStart := time.Now()
257258
schedule := &models.Schedule{
@@ -280,7 +281,7 @@ func TestDispatcher(t *testing.T) {
280281
t.Run("client hook dispatch - no client connected", func(t *testing.T) {
281282
cleanup()
282283
mockNotifier := NewMockClientNotifier()
283-
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 2, 1*time.Millisecond, mockNotifier)
284+
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 2, 1*time.Millisecond, mockNotifier, scheduler)
284285
// Insert the event into the database
285286
event := &models.Event{
286287
ID: uuid.New(),
@@ -312,7 +313,7 @@ func TestDispatcher(t *testing.T) {
312313
err = redisClient.RPush(ctx, dispatchQueueKey, data).Err()
313314
require.NoError(t, err)
314315
// Run the worker for a short time to process retries
315-
runWorkerAndWait(ctx, dispatcher, NewScheduler(redisClient, logger), 20*time.Millisecond)
316+
runWorkerAndWait(ctx, dispatcher, scheduler, 20*time.Millisecond)
316317
// Now assert the number of calls
317318
assert.Equal(t, 3, len(mockNotifier.calls)) // 3 attempts (initial + 2 retries)
318319
})
@@ -325,7 +326,7 @@ func TestDispatcher(t *testing.T) {
325326
dispatcherStart := time.Now()
326327
mockNotifier := NewMockClientNotifier()
327328
mockNotifier.connected["client3"] = []string{"c1", "c2"}
328-
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Millisecond, mockNotifier)
329+
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Millisecond, mockNotifier, scheduler)
329330
fmt.Printf("[PROFILE] dispatcher creation: %v\n", time.Since(dispatcherStart))
330331
scheduleStart := time.Now()
331332
schedule := &models.Schedule{
@@ -364,11 +365,12 @@ func TestDispatcher_RedisOnlyDispatch(t *testing.T) {
364365
occurrenceRepo := repository.NewOccurrenceRepository(db, logger)
365366
hmacService := services.NewHMACService("test-secret")
366367
mockHTTP := new(MockHTTPClient)
367-
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil)
368+
scheduler := NewScheduler(redisClient, logger)
369+
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler)
368370
dispatcher.SetHTTPClient(mockHTTP)
369371

370372
// Create Scheduler instance
371-
scheduler := NewScheduler(redisClient, logger)
373+
scheduler = NewScheduler(redisClient, logger)
372374

373375
// Create event and schedule, schedule in Redis
374376
event := &models.Event{
@@ -437,11 +439,12 @@ func TestDispatcher_GetDueSchedules(t *testing.T) {
437439
occurrenceRepo := repository.NewOccurrenceRepository(db, logger)
438440
hmacService := services.NewHMACService("test-secret")
439441
mockHTTP := new(MockHTTPClient)
440-
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil)
442+
scheduler := NewScheduler(redisClient, logger)
443+
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler)
441444
dispatcher.SetHTTPClient(mockHTTP)
442445

443446
// Create Scheduler instance
444-
scheduler := NewScheduler(redisClient, logger)
447+
scheduler = NewScheduler(redisClient, logger)
445448

446449
// Create event and schedule, schedule in Redis
447450
event := &models.Event{
@@ -529,9 +532,9 @@ func TestDispatcher_DispatchQueueWorker(t *testing.T) {
529532
occurrenceRepo := repository.NewOccurrenceRepository(db, logger)
530533
hmacService := services.NewHMACService("test-secret")
531534
mockHTTP := new(MockHTTPClient)
532-
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil)
533-
dispatcher.SetHTTPClient(mockHTTP)
534535
scheduler := NewScheduler(redisClient, logger)
536+
dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler)
537+
dispatcher.SetHTTPClient(mockHTTP)
535538

536539
cleanup := func() {
537540
_, err := db.ExecContext(ctx, "TRUNCATE TABLE occurrences CASCADE")

main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ func main() {
197197
// Initialize services
198198
tokenService := services.NewTokenService(cfg.Auth.MasterToken, cfg.Auth.JWTSecret)
199199
hmacService := services.NewHMACService(cfg.HMAC.DefaultSecret)
200-
dispatcher := scheduler.NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, cfg.DispatchMaxRetries, cfg.DispatchRetryBackoff, wsHandler)
200+
dispatcher := scheduler.NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, cfg.DispatchMaxRetries, cfg.DispatchRetryBackoff, wsHandler, schedulerService)
201201

202202
// Initialize handlers
203203
eventHandler := handlers.NewEventHandler(eventRepo, expander)
@@ -232,7 +232,7 @@ func main() {
232232

233233
// Start archival scheduler in background
234234
archivalStopCh := make(chan struct{})
235-
scheduler.StartArchivalScheduler(db, cfg.Archival.CheckPeriod, *durations, archivalStopCh, logger)
235+
scheduler.StartArchivalScheduler(db, redisClient, cfg.Archival.CheckPeriod, *durations, archivalStopCh, logger)
236236

237237
// Start scheduler services in background
238238
ctx := context.Background()

0 commit comments

Comments
 (0)