From 15aa344089d43230d11ed4edaa840291141f00c7 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Fri, 6 Feb 2026 10:28:29 -0600 Subject: [PATCH] JobCountByQueueAndState: fix empty queue parity, coverage `JobCountByQueueAndState` returned inconsistent results across drivers when callers requested queues with no matching jobs. PostgreSQL returned those requested queues with zero counts because the query materialized an `all_queues` CTE, while SQLite needed wrapper-level fill-in to produce the same rows. This change keeps the SQLite `sqlc` query for counting existing queues, but makes the wrapper complete missing requested queues so behavior is aligned. It also defines deduplicated queue-name semantics consistently: PostgreSQL now uses `DISTINCT` in `all_queues`, and SQLite deduplicates its sorted input in the wrapper to match. Driver conformance coverage now explicitly checks missing requested queues and deduplicated queue-name input for `JobCountByQueueAndState`, so parity stays enforced across drivers. Fixes riverqueue/riverui#496. --- CHANGELOG.md | 4 ++ .../internal/dbsqlc/river_job.sql.go | 2 +- .../riverdrivertest/riverdrivertest.go | 50 +++++++++++++++++++ .../riverpgxv5/internal/dbsqlc/river_job.sql | 2 +- .../internal/dbsqlc/river_job.sql.go | 2 +- .../riversqlite/river_sqlite_driver.go | 40 +++++++++++++-- 6 files changed, 93 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98e7d4a1..f3b311fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- `JobCountByQueueAndState` now returns consistent results across drivers, including requested queues with zero jobs, and deduplicates repeated queue names in input. This resolves an issue with the sqlite driver in River UI reported in [riverqueue/riverui#496](https://github.com/riverqueue/riverui#496). [PR #1140](https://github.com/riverqueue/river/pull/1140). + ## [0.30.2] - 2026-01-26 ### Fixed diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index e9e843ca..14ec9c43 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -133,7 +133,7 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many WITH all_queues AS ( - SELECT unnest($1::text[])::text AS queue + SELECT DISTINCT unnest($1::text[])::text AS queue ), running_job_counts AS ( diff --git a/riverdriver/riverdrivertest/riverdrivertest.go b/riverdriver/riverdrivertest/riverdrivertest.go index 27aade13..9cd9a434 100644 --- a/riverdriver/riverdrivertest/riverdrivertest.go +++ b/riverdriver/riverdrivertest/riverdrivertest.go @@ -787,6 +787,56 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, require.Equal(t, int64(1), countsByQueue[1].CountAvailable) require.Equal(t, int64(1), countsByQueue[1].CountRunning) }) + + t.Run("IncludesRequestedQueuesThatHaveNoJobs", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ + QueueNames: []string{"queue1", "queue2"}, + Schema: "", + }) + require.NoError(t, err) + + require.Len(t, countsByQueue, 2) + + require.Equal(t, "queue1", countsByQueue[0].Queue) + require.Equal(t, int64(0), countsByQueue[0].CountAvailable) + require.Equal(t, int64(0), countsByQueue[0].CountRunning) + + require.Equal(t, "queue2", countsByQueue[1].Queue) + require.Equal(t, int64(1), countsByQueue[1].CountAvailable) + require.Equal(t, int64(1), countsByQueue[1].CountRunning) + }) + + t.Run("InputQueueNamesAreDeduplicated", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{ + QueueNames: []string{"queue2", "queue1", "queue1"}, + Schema: "", + }) + require.NoError(t, err) + + require.Len(t, countsByQueue, 2) + + require.Equal(t, "queue1", countsByQueue[0].Queue) + require.Equal(t, int64(0), countsByQueue[0].CountAvailable) + require.Equal(t, int64(0), countsByQueue[0].CountRunning) + + require.Equal(t, "queue2", countsByQueue[1].Queue) + require.Equal(t, int64(1), countsByQueue[1].CountAvailable) + require.Equal(t, int64(1), countsByQueue[1].CountRunning) + }) }) t.Run("JobCountByState", func(t *testing.T) { diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 95a2493e..dde85de4 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -87,7 +87,7 @@ GROUP BY state; -- name: JobCountByQueueAndState :many WITH all_queues AS ( - SELECT unnest(@queue_names::text[])::text AS queue + SELECT DISTINCT unnest(@queue_names::text[])::text AS queue ), running_job_counts AS ( diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 25b2a95a..a86a2235 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -130,7 +130,7 @@ func (q *Queries) JobCountByAllStates(ctx context.Context, db DBTX) ([]*JobCount const jobCountByQueueAndState = `-- name: JobCountByQueueAndState :many WITH all_queues AS ( - SELECT unnest($1::text[])::text AS queue + SELECT DISTINCT unnest($1::text[])::text AS queue ), running_job_counts AS ( diff --git a/riverdriver/riversqlite/river_sqlite_driver.go b/riverdriver/riversqlite/river_sqlite_driver.go index 7e6af4bc..05b29489 100644 --- a/riverdriver/riversqlite/river_sqlite_driver.go +++ b/riverdriver/riversqlite/river_sqlite_driver.go @@ -295,14 +295,46 @@ func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdri if err != nil { return nil, interpretError(err) } - results := make([]*riverdriver.JobCountByQueueAndStateResult, len(rows)) - for i, row := range rows { - results[i] = &riverdriver.JobCountByQueueAndStateResult{ + + // The PostgreSQL drivers implement this query with an `all_queues` CTE and + // LEFT JOINs, so they return one row per requested queue, including queues + // that currently have no jobs. The input queue list is deduplicated in SQL. + // The SQLite sqlc driver only reliably supports `sqlc.slice` in `IN (...)`, + // and we haven't found a workable way to bind a parameterized list through + // `json_each(...)` to produce equivalent SQL. The SQLite SQL query therefore + // returns only queues with matching rows, and this wrapper fills in missing + // queues to match PostgreSQL behavior. + countsByQueue := make(map[string]struct { + CountAvailable int64 + CountRunning int64 + }, len(rows)) + for _, row := range rows { + countsByQueue[row.Queue] = struct { + CountAvailable int64 + CountRunning int64 + }{ CountAvailable: row.CountAvailable, CountRunning: row.CountRunning, - Queue: row.Queue, } } + + queueNames := slices.Clone(params.QueueNames) + slices.Sort(queueNames) + queueNames = slices.Compact(queueNames) + + results := make([]*riverdriver.JobCountByQueueAndStateResult, 0, len(queueNames)) + for _, queueName := range queueNames { + result := &riverdriver.JobCountByQueueAndStateResult{ + Queue: queueName, + } + if counts, ok := countsByQueue[queueName]; ok { + result.CountAvailable = counts.CountAvailable + result.CountRunning = counts.CountRunning + } + + results = append(results, result) + } + return results, nil }