diff --git a/CHANGELOG.md b/CHANGELOG.md index 98e7d4a1..f3b311fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- `JobCountByQueueAndState` now returns consistent results across drivers, including requested queues with zero jobs, and deduplicates repeated queue names in input. This resolves an issue with the sqlite driver in River UI reported in [riverqueue/riverui#496](https://github.com/riverqueue/riverui#496). [PR #1140](https://github.com/riverqueue/river/pull/1140). + ## [0.30.2] - 2026-01-26 ### Fixed diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index e9e843ca..14ec9c43 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -133,7 +133,7 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many WITH all_queues AS ( - SELECT unnest($1::text[])::text AS queue + SELECT DISTINCT unnest($1::text[])::text AS queue ), running_job_counts AS ( diff --git a/riverdriver/riverdrivertest/riverdrivertest.go b/riverdriver/riverdrivertest/riverdrivertest.go index 27aade13..9cd9a434 100644 --- a/riverdriver/riverdrivertest/riverdrivertest.go +++ b/riverdriver/riverdrivertest/riverdrivertest.go @@ -787,6 +787,56 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.Equal(t, int64(1), countsByQueue[1].CountAvailable) require.Equal(t, int64(1), countsByQueue[1].CountRunning) }) + + t.Run("IncludesRequestedQueuesThatHaveNoJobs", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ + QueueNames: []string{"queue1", "queue2"}, + Schema: "", + }) + require.NoError(t, err) + + require.Len(t, countsByQueue, 2) + + require.Equal(t, "queue1", countsByQueue[0].Queue) + require.Equal(t, int64(0), countsByQueue[0].CountAvailable) + require.Equal(t, int64(0), countsByQueue[0].CountRunning) + + require.Equal(t, "queue2", countsByQueue[1].Queue) + require.Equal(t, int64(1), countsByQueue[1].CountAvailable) + require.Equal(t, int64(1), countsByQueue[1].CountRunning) + }) + + t.Run("InputQueueNamesAreDeduplicated", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ + QueueNames: []string{"queue2", "queue1", "queue1"}, + Schema: "", + }) + require.NoError(t, err) + + require.Len(t, countsByQueue, 2) + + require.Equal(t, "queue1", countsByQueue[0].Queue) + require.Equal(t, int64(0), countsByQueue[0].CountAvailable) + require.Equal(t, int64(0), countsByQueue[0].CountRunning) + + require.Equal(t, "queue2", countsByQueue[1].Queue) + require.Equal(t, int64(1), countsByQueue[1].CountAvailable) + require.Equal(t, int64(1), countsByQueue[1].CountRunning) + }) }) t.Run("JobCountByState", func(t *testing.T) { diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 95a2493e..dde85de4 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -87,7 +87,7 @@ GROUP BY state; -- name: JobCountByQueueAndState :many WITH all_queues AS ( - SELECT unnest(@queue_names::text[])::text AS queue + SELECT DISTINCT unnest(@queue_names::text[])::text AS queue ), running_job_counts AS ( diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 25b2a95a..a86a2235 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -130,7 +130,7 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many WITH all_queues AS ( - SELECT unnest($1::text[])::text AS queue + SELECT DISTINCT unnest($1::text[])::text AS queue ), running_job_counts AS ( diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 7e6af4bc..05b29489 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -295,14 +295,46 @@ func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdri if err != nil { return nil, interpretError(err) } - results := make([]*riverdriver.JobCountByQueueAndStateResult, len(rows)) - for i, row := range rows { - results[i] = &riverdriver.JobCountByQueueAndStateResult{ + + // The PostgreSQL drivers implement this query with an `all_queues` CTE and + // LEFT JOINs, so they return one row per requested queue, including queues + // that currently have no jobs. The input queue list is deduplicated in SQL. + // The SQLite sqlc driver only reliably supports `sqlc.slice` in `IN (...)`, + // and we haven't found a workable way to bind a parameterized list through + // `json_each(...)` to produce equivalent SQL. The SQLite SQL query therefore + // returns only queues with matching rows, and this wrapper fills in missing + // queues to match PostgreSQL behavior. + countsByQueue := make(map[string]struct { + CountAvailable int64 + CountRunning int64 + }, len(rows)) + for _, row := range rows { + countsByQueue[row.Queue] = struct { + CountAvailable int64 + CountRunning int64 + }{ CountAvailable: row.CountAvailable, CountRunning: row.CountRunning, - Queue: row.Queue, } } + + queueNames := slices.Clone(params.QueueNames) + slices.Sort(queueNames) + queueNames = slices.Compact(queueNames) + + results := make([]*riverdriver.JobCountByQueueAndStateResult, 0, len(queueNames)) + for _, queueName := range queueNames { + result := &riverdriver.JobCountByQueueAndStateResult{ + Queue: queueName, + } + if counts, ok := countsByQueue[queueName]; ok { + result.CountAvailable = counts.CountAvailable + result.CountRunning = counts.CountRunning + } + + results = append(results, result) + } + return results, nil }