Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions boltjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ type Driver struct {
log *slog.Logger
pq jobs.Queue
pipeline atomic.Pointer[jobs.Pipeline]
cond *sync.Cond

listeners atomic.Uint32
active *uint64
delayed *uint64
active atomic.Uint64
delayed atomic.Uint64

stopCh chan struct{}
}
Expand Down Expand Up @@ -106,10 +105,6 @@ func FromConfig(_ context.Context, tracer *sdktrace.TracerProvider, configKey st
return new(bytes.Buffer)
},
},
cond: sync.NewCond(&sync.Mutex{}),

delayed: new(uint64),
active: new(uint64),

db: db,
log: log,
Expand Down Expand Up @@ -170,10 +165,6 @@ func FromPipeline(_ context.Context, tracer *sdktrace.TracerProvider, pipeline j
bPool: sync.Pool{New: func() any {
return new(bytes.Buffer)
}},
cond: sync.NewCond(&sync.Mutex{}),

delayed: new(uint64),
active: new(uint64),

db: db,
log: log,
Expand Down Expand Up @@ -216,7 +207,7 @@ func (d *Driver) Push(ctx context.Context, job jobs.Message) error {
return errors.E(op, err)
}

atomic.AddUint64(d.delayed, 1)
d.delayed.Add(1)

return nil
}
Expand All @@ -227,7 +218,7 @@ func (d *Driver) Push(ctx context.Context, job jobs.Message) error {
return errors.E(op, err)
}

atomic.AddUint64(d.active, 1)
d.active.Add(1)

return nil
})
Expand Down Expand Up @@ -297,7 +288,7 @@ func (d *Driver) Pause(ctx context.Context, p string) error {
d.stopCh <- struct{}{}
d.stopCh <- struct{}{}

d.listeners.Add(^uint32(0))
d.listeners.Store(0)

d.log.Debug("pipeline was paused", "driver", pipe.Driver(), "pipeline", pipe.Name(), "start", start, "elapsed", time.Since(start))

Expand Down Expand Up @@ -340,9 +331,9 @@ func (d *Driver) State(ctx context.Context) (*jobs.State, error) {
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
Queue: PushBucket,
Priority: uint64(pipe.Priority()), //nolint:gosec
Active: int64(atomic.LoadUint64(d.active)), //nolint:gosec
Delayed: int64(atomic.LoadUint64(d.delayed)), //nolint:gosec
Priority: uint64(pipe.Priority()), //nolint:gosec
Active: int64(d.active.Load()), //nolint:gosec
Delayed: int64(d.delayed.Load()), //nolint:gosec
Ready: d.listeners.Load() > 0,
}, nil
}
Expand All @@ -368,12 +359,26 @@ func create(db *bolt.DB) error {
}

inQb := tx.Bucket(strToBytes(InQueueBucket))
cursor := inQb.Cursor()

pushB := tx.Bucket(strToBytes(PushBucket))

for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
err = pushB.Put(k, v)
// Collect all in-queue entries before any deletes: deleting the
// current key mid-iteration shifts the underlying inode slice and
// causes cursor.Next() to skip the next entry.
type kv struct{ k, v []byte }
var entries []kv
cur := inQb.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
entries = append(entries, kv{
k: append([]byte(nil), k...),
v: append([]byte(nil), v...),
})
}
for _, e := range entries {
err = pushB.Put(e.k, e.v)
if err != nil {
return errors.E(upOp, err)
}
err = inQb.Delete(e.k)
if err != nil {
return errors.E(upOp, err)
}
Expand Down
10 changes: 5 additions & 5 deletions boltjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type Options struct {

// private
db *bbolt.DB
active *uint64
delayed *uint64
active *atomic.Uint64
delayed *atomic.Uint64
}

func (i *Item) ID() string {
Expand Down Expand Up @@ -96,9 +96,9 @@ func (i *Item) Context() ([]byte, error) {
func (i *Item) Ack() error {
defer func() {
if i.Options.Delay > 0 {
atomic.AddUint64(i.Options.delayed, ^uint64(0))
i.Options.delayed.Add(^uint64(0))
} else {
atomic.AddUint64(i.Options.active, ^uint64(0))
i.Options.active.Add(^uint64(0))
}
}()

Expand Down Expand Up @@ -234,7 +234,7 @@ func (i *Item) Respond(_ []byte, _ string) error {
return nil
}

func (i *Item) attachDB(db *bbolt.DB, active, delayed *uint64) {
func (i *Item) attachDB(db *bbolt.DB, active, delayed *atomic.Uint64) {
i.Options.db = db
i.Options.active = active
i.Options.delayed = delayed
Expand Down
36 changes: 26 additions & 10 deletions boltjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"context"
"encoding/gob"
"sync/atomic"
"slices"
"time"

bolt "go.etcd.io/bbolt"
Expand All @@ -22,7 +22,7 @@ func (d *Driver) listener() {
_ = d.pq.Remove((*d.pipeline.Load()).Name())
return
case <-tt.C:
if atomic.LoadUint64(d.active) > uint64(d.prefetch) { //nolint:gosec
if d.active.Load() > uint64(d.prefetch) { //nolint:gosec
time.Sleep(time.Second)
continue
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func (d *Driver) listener() {
}

d.prop.Inject(ctx, propagation.HeaderCarrier(item.headers))
item.attachDB(d.db, d.active, d.delayed)
item.attachDB(d.db, &d.active, &d.delayed)
d.pq.Insert(item)
span.End()
}
Expand Down Expand Up @@ -137,15 +137,21 @@ func (d *Driver) delayedJobsListener() {
cursor := delayB.Cursor()
endDate := strToBytes(time.Now().UTC().Format(time.RFC3339))

for k, v := cursor.Seek(startDate); k != nil && bytes.Compare(k, endDate) <= 0; k, v = cursor.Next() {
// Collect items ready to be enqueued; defer pq.Insert until after
// a successful commit so that a rollback does not leave ghost
// items in the in-memory queue.
var ready []*Item
txOk := true
for k, v := cursor.Seek(startDate); k != nil && slices.Compare(k, endDate) <= 0; k, v = cursor.Next() {
buf := bytes.NewReader(v)
dec := gob.NewDecoder(buf)

item := &Item{}
err = dec.Decode(item)
if err != nil {
d.rollback(err, tx)
continue
txOk = false
break
}

if item.Options.Priority == 0 {
Expand All @@ -160,33 +166,43 @@ func (d *Driver) delayedJobsListener() {
err = inQb.Put(strToBytes(item.ID()), v)
if err != nil {
d.rollback(err, tx)
continue
txOk = false
break
}
}

err = delayB.Delete(k)
if err != nil {
d.rollback(err, tx)
continue
txOk = false
break
}

item.attachDB(d.db, d.active, d.delayed)
d.pq.Insert(item)
item.attachDB(d.db, &d.active, &d.delayed)
ready = append(ready, item)
}

if !txOk {
continue
Comment thread
rustatian marked this conversation as resolved.
}

err = tx.Commit()
if err != nil {
d.rollback(err, tx)
continue
}

for _, item := range ready {
d.pq.Insert(item)
}
}
}
}

func (d *Driver) rollback(err error, tx *bolt.Tx) {
errR := tx.Rollback()
if errR != nil {
d.log.Error("transaction commit error, rollback failed", "error", err, "error", errR)
d.log.Error("transaction commit error, rollback failed", "error", err, "rollback_error", errR)
return
}

Expand Down
71 changes: 37 additions & 34 deletions boltkv/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ func (d *Driver) Get(ctx context.Context, key string) ([]byte, error) {
buf := bytes.NewReader(val)
decoder := gob.NewDecoder(buf)

var i string
var i []byte
err := decoder.Decode(&i)
if err != nil {
return errors.E(op, err)
}

val = strToBytes(i)
val = i
}
return nil
})
Expand Down Expand Up @@ -423,38 +423,7 @@ func (d *Driver) startGCLoop() {
for {
select {
case <-t.C:
d.clearMu.RLock()

now := time.Now().UTC()
d.gc.Range(func(key, value any) bool {
const op = errors.Op("boltdb_plugin_gc")
k := key.(string)
v, err := time.Parse(time.RFC3339, value.(string))
if err != nil {
d.log.Error("failed to parse TTL, removing entry", "key", k, "error", err)
d.gc.Delete(k)
return true
}

if now.After(v) {
d.gc.Delete(k)
d.log.Debug("key deleted", "key", k)
err := d.DB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(d.bucket)
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
return b.Delete(strToBytes(k))
})
if err != nil {
d.log.Error("error during the gc phase of update", "error", err)
return false
}
}
return true
})

d.clearMu.RUnlock()
d.gcRun()
case <-d.stop:
err := d.DB.Close()
if err != nil {
Expand All @@ -465,6 +434,40 @@ func (d *Driver) startGCLoop() {
}
}

func (d *Driver) gcRun() {
d.clearMu.RLock()
defer d.clearMu.RUnlock()

now := time.Now().UTC()
d.gc.Range(func(key, value any) bool {
const op = errors.Op("boltdb_plugin_gc")
k := key.(string)
v, err := time.Parse(time.RFC3339, value.(string))
if err != nil {
d.log.Error("failed to parse TTL, removing entry", "key", k, "error", err)
d.gc.Delete(k)
return true
}

if now.After(v) {
d.gc.Delete(k)
d.log.Debug("key deleted", "key", k)
err := d.DB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(d.bucket)
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
return b.Delete(strToBytes(k))
})
if err != nil {
d.log.Error("error during the gc phase of update", "error", err)
return false
}
}
return true
})
}

func strToBytes(data string) []byte {
if data == "" {
return nil
Expand Down
Loading