Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,11 @@ type Config struct {
// The effect of hooks in this list will depend on the specific hook
// interfaces they implement, so for example implementing
// rivertype.HookInsertBegin will cause the hook to be invoked before a job
// is inserted, or implementing rivertype.HookWorkBegin will cause it to be
// invoked before a job is worked. Hook structs may implement multiple hook
// interfaces.
// is inserted, implementing rivertype.HookMetricEmit will cause the hook to
// be invoked when River emits a metric, or implementing
// rivertype.HookWorkBegin will cause it to be invoked before a job is
// worked. Hook structs may
// implement multiple hook interfaces.
//
// Order in this list is significant. A hook that appears first will be
// entered before a hook that appears later. For any particular phase, order
Expand Down
10 changes: 10 additions & 0 deletions hook_defaults_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ func (f HookInsertBeginFunc) InsertBegin(ctx context.Context, params *rivertype.

func (f HookInsertBeginFunc) IsHook() bool { return true }

// HookMetricEmitFunc is a convenience helper for implementing
// rivertype.HookMetricEmit using a simple function instead of a struct.
type HookMetricEmitFunc func(ctx context.Context, params *rivertype.HookMetricEmitParams)

func (f HookMetricEmitFunc) IsHook() bool { return true }

func (f HookMetricEmitFunc) MetricEmit(ctx context.Context, params *rivertype.HookMetricEmitParams) {
f(ctx, params)
}

// HookPeriodicJobsStartFunc is a convenience helper for implementing
// rivertype.HookPeriodicJobsStart using a simple function instead of a struct.
type HookPeriodicJobsStartFunc func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error
Expand Down
6 changes: 6 additions & 0 deletions hook_defaults_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,15 @@ var (
_ rivertype.Hook = HookInsertBeginFunc(func(ctx context.Context, params *rivertype.JobInsertParams) error { return nil })
_ rivertype.HookInsertBegin = HookInsertBeginFunc(func(ctx context.Context, params *rivertype.JobInsertParams) error { return nil })

_ rivertype.Hook = HookMetricEmitFunc(func(ctx context.Context, params *rivertype.HookMetricEmitParams) {})
_ rivertype.HookMetricEmit = HookMetricEmitFunc(func(ctx context.Context, params *rivertype.HookMetricEmitParams) {})

_ rivertype.Hook = HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { return nil })
_ rivertype.HookPeriodicJobsStart = HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error { return nil })

_ rivertype.Hook = HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error { return nil })
_ rivertype.HookWorkBegin = HookWorkBeginFunc(func(ctx context.Context, job *rivertype.JobRow) error { return nil })

_ rivertype.Hook = HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { return err })
_ rivertype.HookWorkEnd = HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { return err })
)
7 changes: 7 additions & 0 deletions internal/hooklookup/hook_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type HookKind string

const (
HookKindInsertBegin HookKind = "insert_begin"
HookKindMetricEmit HookKind = "metric_emit"
HookKindPeriodicJobsStart HookKind = "periodic_job_start"
HookKindWorkBegin HookKind = "work_begin"
HookKindWorkEnd HookKind = "work_end"
Expand Down Expand Up @@ -84,6 +85,12 @@ func (c *hookLookup) ByHookKind(kind HookKind) []rivertype.Hook {
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
}
}
case HookKindMetricEmit:
for _, hook := range c.hooks {
if typedHook, ok := hook.(rivertype.HookMetricEmit); ok {
c.hooksByKind[kind] = append(c.hooksByKind[kind], typedHook)
}
}
case HookKindPeriodicJobsStart:
for _, hook := range c.hooks {
if typedHook, ok := hook.(rivertype.HookPeriodicJobsStart); ok {
Expand Down
22 changes: 21 additions & 1 deletion internal/hooklookup/hook_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestHookLookup(t *testing.T) {
return NewHookLookup([]rivertype.Hook{ //nolint:forcetypeassert
&testHookInsertAndWorkBegin{},
&testHookInsertBegin{},
&testHookMetricEmit{},
&testHookWorkBegin{},
&testHookWorkEnd{},
}).(*hookLookup), &testBundle{}
Expand All @@ -35,6 +36,9 @@ func TestHookLookup(t *testing.T) {
&testHookInsertAndWorkBegin{},
&testHookInsertBegin{},
}, hookLookup.ByHookKind(HookKindInsertBegin))
require.Equal(t, []rivertype.Hook{
&testHookMetricEmit{},
}, hookLookup.ByHookKind(HookKindMetricEmit))
require.Equal(t, []rivertype.Hook{
&testHookInsertAndWorkBegin{},
&testHookWorkBegin{},
Expand All @@ -43,13 +47,16 @@ func TestHookLookup(t *testing.T) {
&testHookWorkEnd{},
}, hookLookup.ByHookKind(HookKindWorkEnd))

require.Len(t, hookLookup.hooksByKind, 3)
require.Len(t, hookLookup.hooksByKind, 4)

// Repeat lookups to make sure we get the same result.
require.Equal(t, []rivertype.Hook{
&testHookInsertAndWorkBegin{},
&testHookInsertBegin{},
}, hookLookup.ByHookKind(HookKindInsertBegin))
require.Equal(t, []rivertype.Hook{
&testHookMetricEmit{},
}, hookLookup.ByHookKind(HookKindMetricEmit))
require.Equal(t, []rivertype.Hook{
&testHookInsertAndWorkBegin{},
&testHookWorkBegin{},
Expand All @@ -75,6 +82,7 @@ func TestHookLookup(t *testing.T) {
}

parallelLookupLoop(HookKindInsertBegin)
parallelLookupLoop(HookKindMetricEmit)
parallelLookupLoop(HookKindWorkBegin)
parallelLookupLoop(HookKindInsertBegin)
parallelLookupLoop(HookKindWorkBegin)
Expand All @@ -100,6 +108,7 @@ func TestEmptyHookLookup(t *testing.T) {
hookLookup, _ := setup(t)

require.Nil(t, hookLookup.ByHookKind(HookKindInsertBegin))
require.Nil(t, hookLookup.ByHookKind(HookKindMetricEmit))
require.Nil(t, hookLookup.ByHookKind(HookKindWorkBegin))
})
}
Expand Down Expand Up @@ -241,6 +250,17 @@ func (t *testHookInsertBegin) InsertBegin(ctx context.Context, params *rivertype
return nil
}

//
// testHookMetricEmit
//

var _ rivertype.HookMetricEmit = &testHookMetricEmit{}

type testHookMetricEmit struct{ rivertype.Hook }

func (t *testHookMetricEmit) MetricEmit(ctx context.Context, params *rivertype.HookMetricEmitParams) {
}

//
// testHookWorkBegin
//
Expand Down
70 changes: 60 additions & 10 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,16 @@ type producer struct {
// Jobs which are currently being worked. Only used by main goroutine.
activeJobs map[int64]*jobexecutor.JobExecutor

completer jobcompleter.JobCompleter
config *producerConfig
id atomic.Int64 // atomic because it's written at startup and read during shutdown
exec riverdriver.Executor
errorHandler jobexecutor.ErrorHandler
fetchLimiter *chanutil.DebouncedChan
state riverpilot.ProducerState
pilot riverpilot.Pilot
workers *Workers
completer jobcompleter.JobCompleter
config *producerConfig
id atomic.Int64 // atomic because it's written at startup and read during shutdown
exec riverdriver.Executor
errorHandler jobexecutor.ErrorHandler
fetchLimiter *chanutil.DebouncedChan
metricEmitHooks []rivertype.HookMetricEmit // memoized hooks of type HookMetricEmit for reuse in dispatchWork
state riverpilot.ProducerState
pilot riverpilot.Pilot
workers *Workers

// Receives job IDs to cancel. Written by notifier goroutine, only read from
// main goroutine.
Expand Down Expand Up @@ -233,7 +234,7 @@ func newProducer(archetype *baseservice.Archetype, exec riverdriver.Executor, pi
errorHandler = &errorHandlerAdapter{config.ErrorHandler}
}

return baseservice.Init(archetype, &producer{
producer := baseservice.Init(archetype, &producer{
activeJobs: make(map[int64]*jobexecutor.JobExecutor),
cancelCh: make(chan int64, 1000),
completer: config.Completer,
Expand All @@ -247,6 +248,10 @@ func newProducer(archetype *baseservice.Archetype, exec riverdriver.Executor, pi
retryPolicy: config.RetryPolicy,
workers: config.Workers,
})

producer.metricEmitHooks = producer.metricEmitHooksFromLookup()

return producer
}

// Start starts the producer. It backgrounds a goroutine which is stopped when
Expand Down Expand Up @@ -743,6 +748,25 @@ func (p *producer) maybeCancelJob(ctx context.Context, id int64) {
executor.Cancel(ctx)
}

func (p *producer) metricEmitHooksFromLookup() []rivertype.HookMetricEmit {
hookLookup := p.config.HookLookupGlobal
if hookLookup == nil {
return nil
}

hooks := hookLookup.ByHookKind(hooklookup.HookKindMetricEmit)
if len(hooks) < 1 {
return nil
}

metricEmitHooks := make([]rivertype.HookMetricEmit, len(hooks))
for i, hook := range hooks {
metricEmitHooks[i] = hook.(rivertype.HookMetricEmit) //nolint:forcetypeassert
}

return metricEmitHooks
}

func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) {
// This intentionally removes any deadlines or cancellation from the parent
// context because we don't want it to get cancelled if the producer is asked
Expand All @@ -757,6 +781,11 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC
// rarely hit, but exists to protect against degenerate cases.
const maxAttemptedBy = 100

var startedAt time.Time
if len(p.metricEmitHooks) > 0 {
startedAt = time.Now()
}

jobs, err := p.pilot.JobGetAvailable(ctx, p.exec, p.state, &riverdriver.JobGetAvailableParams{
ClientID: p.config.ClientID,
MaxAttemptedBy: maxAttemptedBy,
Expand All @@ -771,9 +800,30 @@ func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultC
return
}

if len(p.metricEmitHooks) > 0 {
p.emitMetric(ctx, &rivertype.HookMetricEmitParams{
Metric: &rivertype.JobGetAvailableDurationMetric{
Duration: time.Since(startedAt),
Queue: p.config.Queue,
},
})
p.emitMetric(ctx, &rivertype.HookMetricEmitParams{
Metric: &rivertype.JobGetAvailableCountMetric{
Count: len(jobs),
Queue: p.config.Queue,
},
})
}

fetchResultCh <- producerFetchResult{jobs: jobs}
}

func (p *producer) emitMetric(ctx context.Context, params *rivertype.HookMetricEmitParams) {
for _, hook := range p.metricEmitHooks {
hook.MetricEmit(ctx, params)
}
}

// Periodically logs an informational log line giving some insight into the
// current state of the producer.
func (p *producer) heartbeatLogLoop(ctx context.Context, wg *sync.WaitGroup) {
Expand Down
130 changes: 130 additions & 0 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ import (

const testClientID = "test-client-id"

type countingHookLookup struct {
hooklookup.HookLookupInterface

count int
}

func (l *countingHookLookup) ByHookKind(kind hooklookup.HookKind) []rivertype.Hook {
l.count++
return l.HookLookupInterface.ByHookKind(kind)
}

func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) {
// We have encountered previous data races with the list of active jobs on
// Producer because we need to know the count of active jobs in order to
Expand Down Expand Up @@ -161,6 +172,125 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) {
}
}

func TestProducer_MetricEmitHook(t *testing.T) {
t.Parallel()

ctx := context.Background()

type testBundle struct {
archetype *baseservice.Archetype
config *Config
exec riverdriver.Executor
hookLookup *countingHookLookup
metrics chan *rivertype.HookMetricEmitParams
producer *producer
queue string
schema string
}

setup := func(t *testing.T) *testBundle {
t.Helper()

var (
archetype = riversharedtest.BaseServiceArchetype(t)
driver = riverpgxv5.New(riversharedtest.DBPool(ctx, t))
exec = driver.GetExecutor()
jobUpdates = make(chan []jobcompleter.CompleterJobUpdated, 10)
metrics = make(chan *rivertype.HookMetricEmitParams, 10)
pilot = &riverpilot.StandardPilot{}
queueName = "test_producer_metric_hook"
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
)

t.Cleanup(riverinternaltest.DiscardContinuously(jobUpdates))

completer := jobcompleter.NewInlineCompleter(archetype, schema, exec, pilot, jobUpdates)
t.Cleanup(completer.Stop)

metricHook := HookMetricEmitFunc(func(ctx context.Context, params *rivertype.HookMetricEmitParams) {
paramsCopy := *params
metrics <- &paramsCopy
})
hookLookup := &countingHookLookup{
HookLookupInterface: hooklookup.NewHookLookup([]rivertype.Hook{metricHook}),
}

producer := newProducer(archetype, exec, pilot, &producerConfig{
ClientID: testClientID,
Completer: completer,
ErrorHandler: newTestErrorHandler(),
FetchCooldown: FetchCooldownDefault,
FetchPollInterval: 50 * time.Millisecond,
HookLookupByJob: hooklookup.NewJobHookLookup(),
HookLookupGlobal: hookLookup,
JobTimeout: JobTimeoutDefault,
MaxWorkers: 1_000,
MiddlewareLookupGlobal: middlewarelookup.NewMiddlewareLookup(nil),
Queue: queueName,
QueuePollInterval: queuePollIntervalDefault,
QueueReportInterval: queueReportIntervalDefault,
RetryPolicy: &DefaultClientRetryPolicy{},
SchedulerInterval: riverinternaltest.SchedulerShortInterval,
Schema: schema,
StaleProducerRetentionPeriod: time.Minute,
Workers: NewWorkers(),
})

return &testBundle{
archetype: archetype,
config: newTestConfig(t, schema),
exec: exec,
hookLookup: hookLookup,
metrics: metrics,
producer: producer,
queue: queueName,
schema: schema,
}
}

bundle := setup(t)

scheduledAt := time.Now().UTC().Add(-time.Second)
insertParams := make([]*riverdriver.JobInsertFastParams, 2)
for i := range insertParams {
params, err := insertParamsFromConfigArgsAndOptions(bundle.archetype, bundle.config, noOpArgs{}, &InsertOpts{
Queue: bundle.queue,
})
require.NoError(t, err)
params.ScheduledAt = &scheduledAt
insertParams[i] = (*riverdriver.JobInsertFastParams)(params)
}

_, err := bundle.exec.JobInsertFastMany(ctx, &riverdriver.JobInsertFastManyParams{
Jobs: insertParams,
Schema: bundle.schema,
})
require.NoError(t, err)

fetchResultCh := make(chan producerFetchResult, 1)
bundle.producer.dispatchWork(ctx, 2, fetchResultCh)

fetchResult := riversharedtest.WaitOrTimeout(t, fetchResultCh)
require.NoError(t, fetchResult.err)
require.Len(t, fetchResult.jobs, 2)
require.Equal(t, 1, bundle.hookLookup.count)

metricsByName := make(map[rivertype.MetricName]rivertype.Metric)
for _, metric := range riversharedtest.WaitOrTimeoutN(t, bundle.metrics, 2) {
metricsByName[metric.Metric.Name()] = metric.Metric
}

durationMetric, durationMetricFound := metricsByName[rivertype.MetricNameJobGetAvailableDuration].(*rivertype.JobGetAvailableDurationMetric)
require.True(t, durationMetricFound)
require.Equal(t, bundle.queue, durationMetric.Queue)
require.GreaterOrEqual(t, durationMetric.Duration, time.Duration(0))

countMetric, countMetricFound := metricsByName[rivertype.MetricNameJobGetAvailableCount].(*rivertype.JobGetAvailableCountMetric)
require.True(t, countMetricFound)
require.Equal(t, bundle.queue, countMetric.Queue)
require.Equal(t, 2, countMetric.Count)
}

func TestProducer_PollOnly(t *testing.T) {
t.Parallel()

Expand Down
Loading
Loading