Skip to content

Commit 0579ed8

Browse files
committed
fix expansion issues
1 parent 7baa348 commit 0579ed8

9 files changed

Lines changed: 99 additions & 19 deletions

File tree

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ curl -X POST http://localhost:8080/events \
125125
}'
126126
```
127127

128+
> **Note:**
129+
> - For one-time events, omit the `schedule` field and provide `start_time`.
130+
> - For recurring events, provide both `start_time` and a `schedule` object.
131+
> - The `webhook` field is required (not `webhook_url`).
132+
> - The `Authorization` header is required for all requests.
133+
128134
## Schedule Parameter Tutorial
129135

130136
The `schedule` parameter in event creation allows you to define recurring or one-time schedules using a flexible JSON structure. Here are the most common use cases:

config.example.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ server:
33

44
database:
55
host: localhost
6-
port: 5432
6+
port: 5433
77
user: postgres
88
password: postgres
99
dbname: qhronos
@@ -21,7 +21,8 @@ auth:
2121
scheduler:
2222
look_ahead_duration: 24h # How far into the future to expand recurring events
2323
expansion_interval: 5m # How often to run the expander
24-
dispatch_worker_count: 4 # Number of dispatch workers
24+
dispatch_worker_count: 1 # Number of dispatcher workers
25+
grace_period: 2m # How far into the past to look for missed events
2526

2627
archival:
2728
check_period: 1h

docs/api.md

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,34 @@ Authorization: Bearer <token>
1414

1515
### Create Event
1616
- **POST** `/events`
17-
- **Body:**
17+
- **Body (One-Time Event):**
1818
```json
1919
{
2020
"name": "My Event",
2121
"description": "Description...",
2222
"start_time": "2024-04-23T10:00:00Z",
23-
"webhook_url": "https://example.com/webhook",
23+
"webhook": "https://example.com/webhook",
2424
"metadata": {"key": "value"},
25+
"tags": ["tag1", "tag2"]
26+
}
27+
```
28+
- **Body (Recurring Event):**
29+
```json
30+
{
31+
"name": "My Recurring Event",
32+
"description": "Description...",
33+
"start_time": "2024-04-23T10:00:00Z",
34+
"webhook": "https://example.com/webhook",
2535
"schedule": {"frequency": "daily", "interval": 1},
36+
"metadata": {"key": "value"},
2637
"tags": ["tag1", "tag2"]
2738
}
2839
```
40+
- **Note:**
41+
- For one-time events, omit the `schedule` field and provide `start_time`.
42+
- For recurring events, provide both `start_time` and a `schedule` object.
43+
- The `webhook` field is required (not `webhook_url`).
44+
- The `Authorization` header is required for all requests.
2945
- **Response:** `201 Created`
3046

3147
### Get Event

internal/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type SchedulerConfig struct {
6666
LookAheadDuration time.Duration `mapstructure:"look_ahead_duration"`
6767
ExpansionInterval time.Duration `mapstructure:"expansion_interval"`
6868
DispatchWorkerCount int `mapstructure:"dispatch_worker_count"`
69+
GracePeriod time.Duration `mapstructure:"grace_period"`
6970
}
7071

7172
type RetentionConfig struct {

internal/handlers/event_handler.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package handlers
22

33
import (
4+
"context"
45
"net/http"
56
"strings"
67
"time"
78

89
"github.com/feedloop/qhronos/internal/middleware"
910
"github.com/feedloop/qhronos/internal/models"
1011
"github.com/feedloop/qhronos/internal/repository"
12+
"github.com/feedloop/qhronos/internal/scheduler"
1113
"github.com/gin-gonic/gin"
1214
"github.com/google/uuid"
1315
"github.com/lib/pq"
@@ -16,11 +18,12 @@ import (
1618
)
1719

1820
type EventHandler struct {
19-
repo *repository.EventRepository
21+
repo *repository.EventRepository
22+
expander *scheduler.Expander
2023
}
2124

22-
func NewEventHandler(repo *repository.EventRepository) *EventHandler {
23-
return &EventHandler{repo: repo}
25+
func NewEventHandler(repo *repository.EventRepository, expander *scheduler.Expander) *EventHandler {
26+
return &EventHandler{repo: repo, expander: expander}
2427
}
2528

2629
func (h *EventHandler) CreateEvent(c *gin.Context) {
@@ -77,6 +80,21 @@ func (h *EventHandler) CreateEvent(c *gin.Context) {
7780
return
7881
}
7982
logger.Info("Event created", zap.String("event_id", event.ID.String()))
83+
84+
// Immediate expansion if within window
85+
now := time.Now().UTC()
86+
graceStart := now.Add(-h.expander.GracePeriod())
87+
lookAheadEnd := now.Add(h.expander.LookAheadDuration())
88+
if event.StartTime.After(graceStart) && event.StartTime.Before(lookAheadEnd) {
89+
go func() {
90+
if event.Schedule == nil {
91+
_ = h.expander.ExpandNonRecurringEvent(context.Background(), event)
92+
} else {
93+
_ = h.expander.ExpandRecurringEvent(context.Background(), event)
94+
}
95+
}()
96+
}
97+
8098
c.JSON(http.StatusCreated, event)
8199
}
82100

@@ -182,17 +200,21 @@ func (h *EventHandler) DeleteEvent(c *gin.Context) {
182200
func (h *EventHandler) ListEventsByTags(c *gin.Context) {
183201
tagsParam := c.Query("tags")
184202
if tagsParam == "" {
185-
c.JSON(http.StatusBadRequest, gin.H{"error": "tags parameter is required"})
203+
// No tags param: return all events
204+
events, err := h.repo.List(c)
205+
if err != nil {
206+
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
207+
return
208+
}
209+
c.JSON(http.StatusOK, events)
186210
return
187211
}
188-
189212
tags := strings.Split(tagsParam, ",")
190213
events, err := h.repo.ListByTags(c, tags)
191214
if err != nil {
192215
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
193216
return
194217
}
195-
196218
c.JSON(http.StatusOK, events)
197219
}
198220

internal/scheduler/dispatcher.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,13 @@ func (d *Dispatcher) DispatchWebhook(ctx context.Context, sched *models.Schedule
183183
}
184184
_ = d.occurrenceRepo.Create(ctx, logOccurrence) // Ignore error to avoid blocking delivery
185185

186+
// Auto-inactivate one-time events after dispatch (success or max retries)
187+
event, err := d.eventRepo.GetByID(ctx, sched.EventID)
188+
if err == nil && event != nil && event.Schedule == nil && event.Status == "active" {
189+
event.Status = "inactive"
190+
_ = d.eventRepo.Update(ctx, event)
191+
}
192+
186193
return err
187194
}
188195

internal/scheduler/expander.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type Expander struct {
1717
occurrenceRepo *repository.OccurrenceRepository
1818
lookAheadDuration time.Duration
1919
expansionInterval time.Duration
20+
gracePeriod time.Duration
2021
logger *zap.Logger
2122
}
2223

@@ -27,6 +28,7 @@ func NewExpander(
2728
occurrenceRepo *repository.OccurrenceRepository,
2829
lookAheadDuration time.Duration,
2930
expansionInterval time.Duration,
31+
gracePeriod time.Duration,
3032
logger *zap.Logger,
3133
) *Expander {
3234
logger.Debug("Initializing Expander", zap.Duration("lookAheadDuration", lookAheadDuration), zap.Duration("expansionInterval", expansionInterval))
@@ -36,6 +38,7 @@ func NewExpander(
3638
occurrenceRepo: occurrenceRepo,
3739
lookAheadDuration: lookAheadDuration,
3840
expansionInterval: expansionInterval,
41+
gracePeriod: gracePeriod,
3942
logger: logger,
4043
}
4144
}
@@ -116,6 +119,7 @@ func (e *Expander) expandRecurringEvent(ctx context.Context, event *models.Event
116119

117120
// Get next occurrences based on schedule configuration
118121
now := time.Now().UTC()
122+
graceStart := now.Add(-e.gracePeriod)
119123
endTime := now.Add(e.lookAheadDuration)
120124
startTime := event.StartTime.UTC()
121125

@@ -126,14 +130,14 @@ func (e *Expander) expandRecurringEvent(ctx context.Context, event *models.Event
126130
switch schedule.Frequency {
127131
case "daily":
128132
for t := startTime; t.Before(endTime); t = t.AddDate(0, 0, schedule.Interval) {
129-
if t.After(now) {
133+
if t.After(graceStart) {
130134
occurrences = append(occurrences, t)
131135
}
132136
}
133137
case "weekly":
134138
if len(schedule.ByDay) == 0 {
135139
for t := startTime; t.Before(endTime); t = t.AddDate(0, 0, 7*schedule.Interval) {
136-
if t.After(now) {
140+
if t.After(graceStart) {
137141
occurrences = append(occurrences, t)
138142
}
139143
}
@@ -152,7 +156,7 @@ func (e *Expander) expandRecurringEvent(ctx context.Context, event *models.Event
152156
weekday := weekdayMap[day]
153157
daysToAdd := (int(weekday) - int(t.Weekday()) + 7) % 7
154158
nextDay := t.AddDate(0, 0, daysToAdd)
155-
if nextDay.After(now) && nextDay.Before(endTime) {
159+
if nextDay.After(graceStart) && nextDay.Before(endTime) {
156160
occurrences = append(occurrences, nextDay)
157161
}
158162
}
@@ -161,15 +165,15 @@ func (e *Expander) expandRecurringEvent(ctx context.Context, event *models.Event
161165
case "monthly":
162166
if len(schedule.ByMonthDay) == 0 {
163167
for t := startTime; t.Before(endTime); t = t.AddDate(0, schedule.Interval, 0) {
164-
if t.After(now) {
168+
if t.After(graceStart) {
165169
occurrences = append(occurrences, t)
166170
}
167171
}
168172
} else {
169173
for t := startTime; t.Before(endTime); t = t.AddDate(0, schedule.Interval, 0) {
170174
for _, day := range schedule.ByMonthDay {
171175
nextDay := time.Date(t.Year(), t.Month(), day, t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), t.Location())
172-
if nextDay.After(now) && nextDay.Before(endTime) {
176+
if nextDay.After(graceStart) && nextDay.Before(endTime) {
173177
occurrences = append(occurrences, nextDay)
174178
}
175179
}
@@ -178,15 +182,15 @@ func (e *Expander) expandRecurringEvent(ctx context.Context, event *models.Event
178182
case "yearly":
179183
if len(schedule.ByMonth) == 0 {
180184
for t := startTime; t.Before(endTime); t = t.AddDate(schedule.Interval, 0, 0) {
181-
if t.After(now) {
185+
if t.After(graceStart) {
182186
occurrences = append(occurrences, t)
183187
}
184188
}
185189
} else {
186190
for t := startTime; t.Before(endTime); t = t.AddDate(schedule.Interval, 0, 0) {
187191
for _, month := range schedule.ByMonth {
188192
nextMonth := time.Date(t.Year(), time.Month(month), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), t.Location())
189-
if nextMonth.After(now) && nextMonth.Before(endTime) {
193+
if nextMonth.After(graceStart) && nextMonth.Before(endTime) {
190194
occurrences = append(occurrences, nextMonth)
191195
}
192196
}
@@ -239,13 +243,18 @@ func (e *Expander) expandNonRecurringEvent(ctx context.Context, event *models.Ev
239243
}
240244

241245
now := time.Now().UTC()
246+
graceStart := now.Add(-e.gracePeriod)
242247
startTime := event.StartTime.UTC()
243248

244249
endTime := now.Add(e.lookAheadDuration)
245250
if startTime.After(endTime) {
246251
e.logger.Info("Event is beyond look-ahead window, skipping", zap.String("event_id", event.ID.String()))
247252
return nil
248253
}
254+
if startTime.Before(graceStart) {
255+
e.logger.Info("Event is before grace period, skipping", zap.String("event_id", event.ID.String()))
256+
return nil
257+
}
249258

250259
occurrence := &models.Occurrence{
251260
OccurrenceID: uuid.New(),
@@ -277,3 +286,19 @@ func (e *Expander) Run(ctx context.Context) error {
277286
}
278287
}
279288
}
289+
290+
func (e *Expander) ExpandRecurringEvent(ctx context.Context, event *models.Event) error {
291+
return e.expandRecurringEvent(ctx, event)
292+
}
293+
294+
func (e *Expander) ExpandNonRecurringEvent(ctx context.Context, event *models.Event) error {
295+
return e.expandNonRecurringEvent(ctx, event)
296+
}
297+
298+
func (e *Expander) GracePeriod() time.Duration {
299+
return e.gracePeriod
300+
}
301+
302+
func (e *Expander) LookAheadDuration() time.Duration {
303+
return e.lookAheadDuration
304+
}

internal/scheduler/expander_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ func TestEventExpander(t *testing.T) {
3838
// Test configuration
3939
lookAheadDuration := 24 * time.Hour
4040
expansionInterval := 5 * time.Minute
41+
gracePeriod := 2 * time.Minute
4142

42-
expander := NewExpander(scheduler, eventRepo, occurrenceRepo, lookAheadDuration, expansionInterval, logger)
43+
expander := NewExpander(scheduler, eventRepo, occurrenceRepo, lookAheadDuration, expansionInterval, gracePeriod, logger)
4344

4445
t.Run("successful event expansion", func(t *testing.T) {
4546
cleanup()

main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ func main() {
187187
occurrenceRepo,
188188
cfg.Scheduler.LookAheadDuration,
189189
cfg.Scheduler.ExpansionInterval,
190+
cfg.Scheduler.GracePeriod,
190191
logger,
191192
)
192193

@@ -196,7 +197,7 @@ func main() {
196197
dispatcher := scheduler.NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger)
197198

198199
// Initialize handlers
199-
eventHandler := handlers.NewEventHandler(eventRepo)
200+
eventHandler := handlers.NewEventHandler(eventRepo, expander)
200201
occurrenceHandler := handlers.NewOccurrenceHandler(eventRepo, occurrenceRepo)
201202
tokenHandler := handlers.NewTokenHandler(tokenService)
202203

0 commit comments

Comments
 (0)