diff --git a/boltjobs/driver.go b/boltjobs/driver.go index d336037..db0bde7 100644 --- a/boltjobs/driver.go +++ b/boltjobs/driver.go @@ -55,11 +55,10 @@ type Driver struct { log *slog.Logger pq jobs.Queue pipeline atomic.Pointer[jobs.Pipeline] - cond *sync.Cond listeners atomic.Uint32 - active *uint64 - delayed *uint64 + active atomic.Uint64 + delayed atomic.Uint64 stopCh chan struct{} } @@ -106,10 +105,6 @@ func FromConfig(_ context.Context, tracer *sdktrace.TracerProvider, configKey st return new(bytes.Buffer) }, }, - cond: sync.NewCond(&sync.Mutex{}), - - delayed: new(uint64), - active: new(uint64), db: db, log: log, @@ -170,10 +165,6 @@ func FromPipeline(_ context.Context, tracer *sdktrace.TracerProvider, pipeline j bPool: sync.Pool{New: func() any { return new(bytes.Buffer) }}, - cond: sync.NewCond(&sync.Mutex{}), - - delayed: new(uint64), - active: new(uint64), db: db, log: log, @@ -216,7 +207,7 @@ func (d *Driver) Push(ctx context.Context, job jobs.Message) error { return errors.E(op, err) } - atomic.AddUint64(d.delayed, 1) + d.delayed.Add(1) return nil } @@ -227,7 +218,7 @@ func (d *Driver) Push(ctx context.Context, job jobs.Message) error { return errors.E(op, err) } - atomic.AddUint64(d.active, 1) + d.active.Add(1) return nil }) @@ -297,7 +288,7 @@ func (d *Driver) Pause(ctx context.Context, p string) error { d.stopCh <- struct{}{} d.stopCh <- struct{}{} - d.listeners.Add(^uint32(0)) + d.listeners.Store(0) d.log.Debug("pipeline was paused", "driver", pipe.Driver(), "pipeline", pipe.Name(), "start", start, "elapsed", time.Since(start)) @@ -340,9 +331,9 @@ func (d *Driver) State(ctx context.Context) (*jobs.State, error) { Pipeline: pipe.Name(), Driver: pipe.Driver(), Queue: PushBucket, - Priority: uint64(pipe.Priority()), //nolint:gosec - Active: int64(atomic.LoadUint64(d.active)), //nolint:gosec - Delayed: int64(atomic.LoadUint64(d.delayed)), //nolint:gosec + Priority: uint64(pipe.Priority()), //nolint:gosec + Active: int64(d.active.Load()), //nolint:gosec + Delayed: int64(d.delayed.Load()), //nolint:gosec Ready: d.listeners.Load() > 0, }, nil } @@ -368,12 +359,26 @@ func create(db *bolt.DB) error { } inQb := tx.Bucket(strToBytes(InQueueBucket)) - cursor := inQb.Cursor() - pushB := tx.Bucket(strToBytes(PushBucket)) - for k, v := cursor.First(); k != nil; k, v = cursor.Next() { - err = pushB.Put(k, v) + // Collect all in-queue entries before any deletes: deleting the + // current key mid-iteration shifts the underlying inode slice and + // causes cursor.Next() to skip the next entry. + type kv struct{ k, v []byte } + var entries []kv + cur := inQb.Cursor() + for k, v := cur.First(); k != nil; k, v = cur.Next() { + entries = append(entries, kv{ + k: append([]byte(nil), k...), + v: append([]byte(nil), v...), + }) + } + for _, e := range entries { + err = pushB.Put(e.k, e.v) + if err != nil { + return errors.E(upOp, err) + } + err = inQb.Delete(e.k) if err != nil { return errors.E(upOp, err) } diff --git a/boltjobs/item.go b/boltjobs/item.go index 3710f8b..323cb45 100644 --- a/boltjobs/item.go +++ b/boltjobs/item.go @@ -44,8 +44,8 @@ type Options struct { // private db *bbolt.DB - active *uint64 - delayed *uint64 + active *atomic.Uint64 + delayed *atomic.Uint64 } func (i *Item) ID() string { @@ -96,9 +96,9 @@ func (i *Item) Context() ([]byte, error) { func (i *Item) Ack() error { defer func() { if i.Options.Delay > 0 { - atomic.AddUint64(i.Options.delayed, ^uint64(0)) + i.Options.delayed.Add(^uint64(0)) } else { - atomic.AddUint64(i.Options.active, ^uint64(0)) + i.Options.active.Add(^uint64(0)) } }() @@ -234,7 +234,7 @@ func (i *Item) Respond(_ []byte, _ string) error { return nil } -func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) { +func (i *Item) attachDB(db *bbolt.DB, active, delayed *atomic.Uint64) { i.Options.db = db i.Options.active = active i.Options.delayed = delayed diff --git a/boltjobs/listener.go b/boltjobs/listener.go index e3bcfb8..5b450b9 100644 --- a/boltjobs/listener.go +++ b/boltjobs/listener.go @@ -4,7 +4,7 @@ import ( "bytes" "context" "encoding/gob" - "sync/atomic" + "slices" "time" bolt "go.etcd.io/bbolt" @@ -22,7 +22,7 @@ func (d *Driver) listener() { _ = d.pq.Remove((*d.pipeline.Load()).Name()) return case <-tt.C: - if atomic.LoadUint64(d.active) > uint64(d.prefetch) { //nolint:gosec + if d.active.Load() > uint64(d.prefetch) { //nolint:gosec time.Sleep(time.Second) continue } @@ -105,7 +105,7 @@ func (d *Driver) listener() { } d.prop.Inject(ctx, propagation.HeaderCarrier(item.headers)) - item.attachDB(d.db, d.active, d.delayed) + item.attachDB(d.db, &d.active, &d.delayed) d.pq.Insert(item) span.End() } @@ -137,7 +137,12 @@ func (d *Driver) delayedJobsListener() { cursor := delayB.Cursor() endDate := strToBytes(time.Now().UTC().Format(time.RFC3339)) - for k, v := cursor.Seek(startDate); k != nil && bytes.Compare(k, endDate) <= 0; k, v = cursor.Next() { + // Collect items ready to be enqueued; defer pq.Insert until after + // a successful commit so that a rollback does not leave ghost + // items in the in-memory queue. + var ready []*Item + txOk := true + for k, v := cursor.Seek(startDate); k != nil && slices.Compare(k, endDate) <= 0; k, v = cursor.Next() { buf := bytes.NewReader(v) dec := gob.NewDecoder(buf) @@ -145,7 +150,8 @@ func (d *Driver) delayedJobsListener() { err = dec.Decode(item) if err != nil { d.rollback(err, tx) - continue + txOk = false + break } if item.Options.Priority == 0 { @@ -160,18 +166,24 @@ func (d *Driver) delayedJobsListener() { err = inQb.Put(strToBytes(item.ID()), v) if err != nil { d.rollback(err, tx) - continue + txOk = false + break } } err = delayB.Delete(k) if err != nil { d.rollback(err, tx) - continue + txOk = false + break } - item.attachDB(d.db, d.active, d.delayed) - d.pq.Insert(item) + item.attachDB(d.db, &d.active, &d.delayed) + ready = append(ready, item) + } + + if !txOk { + continue } err = tx.Commit() @@ -179,6 +191,10 @@ func (d *Driver) delayedJobsListener() { d.rollback(err, tx) continue } + + for _, item := range ready { + d.pq.Insert(item) + } } } } @@ -186,7 +202,7 @@ func (d *Driver) delayedJobsListener() { func (d *Driver) rollback(err error, tx *bolt.Tx) { errR := tx.Rollback() if errR != nil { - d.log.Error("transaction commit error, rollback failed", "error", err, "error", errR) + d.log.Error("transaction commit error, rollback failed", "error", err, "rollback_error", errR) return } diff --git a/boltkv/driver.go b/boltkv/driver.go index 5ffeb8c..84c60c7 100644 --- a/boltkv/driver.go +++ b/boltkv/driver.go @@ -159,13 +159,13 @@ func (d *Driver) Get(ctx context.Context, key string) ([]byte, error) { buf := bytes.NewReader(val) decoder := gob.NewDecoder(buf) - var i string + var i []byte err := decoder.Decode(&i) if err != nil { return errors.E(op, err) } - val = strToBytes(i) + val = i } return nil }) @@ -423,38 +423,7 @@ func (d *Driver) startGCLoop() { for { select { case <-t.C: - d.clearMu.RLock() - - now := time.Now().UTC() - d.gc.Range(func(key, value any) bool { - const op = errors.Op("boltdb_plugin_gc") - k := key.(string) - v, err := time.Parse(time.RFC3339, value.(string)) - if err != nil { - d.log.Error("failed to parse TTL, removing entry", "key", k, "error", err) - d.gc.Delete(k) - return true - } - - if now.After(v) { - d.gc.Delete(k) - d.log.Debug("key deleted", "key", k) - err := d.DB.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(d.bucket) - if b == nil { - return errors.E(op, errors.NoSuchBucket) - } - return b.Delete(strToBytes(k)) - }) - if err != nil { - d.log.Error("error during the gc phase of update", "error", err) - return false - } - } - return true - }) - - d.clearMu.RUnlock() + d.gcRun() case <-d.stop: err := d.DB.Close() if err != nil { @@ -465,6 +434,40 @@ func (d *Driver) startGCLoop() { } } +func (d *Driver) gcRun() { + d.clearMu.RLock() + defer d.clearMu.RUnlock() + + now := time.Now().UTC() + d.gc.Range(func(key, value any) bool { + const op = errors.Op("boltdb_plugin_gc") + k := key.(string) + v, err := time.Parse(time.RFC3339, value.(string)) + if err != nil { + d.log.Error("failed to parse TTL, removing entry", "key", k, "error", err) + d.gc.Delete(k) + return true + } + + if now.After(v) { + d.gc.Delete(k) + d.log.Debug("key deleted", "key", k) + err := d.DB.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(d.bucket) + if b == nil { + return errors.E(op, errors.NoSuchBucket) + } + return b.Delete(strToBytes(k)) + }) + if err != nil { + d.log.Error("error during the gc phase of update", "error", err) + return false + } + } + return true + }) +} + func strToBytes(data string) []byte { if data == "" { return nil