Skip to content

Commit 4fdba79

Browse files
authored
JobCountByQueueAndState: fix empty queue parity, coverage (#1140)
`JobCountByQueueAndState` returned inconsistent results across drivers when callers requested queues with no matching jobs. PostgreSQL returned those requested queues with zero counts because the query materialized an `all_queues` CTE, while SQLite needed wrapper-level fill-in to produce the same rows. This change keeps the SQLite `sqlc` query for counting existing queues, but makes the wrapper complete missing requested queues so behavior is aligned. It also defines deduplicated queue-name semantics consistently: PostgreSQL now uses `DISTINCT` in `all_queues`, and SQLite deduplicates its sorted input in the wrapper to match. Driver conformance coverage now explicitly checks missing requested queues and deduplicated queue-name input for `JobCountByQueueAndState`, so parity stays enforced across drivers. Fixes riverqueue/riverui#496.
1 parent 77e55ba commit 4fdba79

6 files changed

Lines changed: 93 additions & 7 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Fixed
11+
12+
- `JobCountByQueueAndState` now returns consistent results across drivers, including requested queues with zero jobs, and deduplicates repeated queue names in input. This resolves an issue with the sqlite driver in River UI reported in [riverqueue/riverui#496](https://github.com/riverqueue/riverui#496). [PR #1140](https://github.com/riverqueue/river/pull/1140).
13+
1014
## [0.30.2] - 2026-01-26
1115

1216
### Fixed

riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riverdrivertest/riverdrivertest.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,56 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
787787
require.Equal(t, int64(1), countsByQueue[1].CountAvailable)
788788
require.Equal(t, int64(1), countsByQueue[1].CountRunning)
789789
})
790+
791+
t.Run("IncludesRequestedQueuesThatHaveNoJobs", func(t *testing.T) {
792+
t.Parallel()
793+
794+
exec, _ := setup(ctx, t)
795+
796+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)})
797+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)})
798+
799+
countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{
800+
QueueNames: []string{"queue1", "queue2"},
801+
Schema: "",
802+
})
803+
require.NoError(t, err)
804+
805+
require.Len(t, countsByQueue, 2)
806+
807+
require.Equal(t, "queue1", countsByQueue[0].Queue)
808+
require.Equal(t, int64(0), countsByQueue[0].CountAvailable)
809+
require.Equal(t, int64(0), countsByQueue[0].CountRunning)
810+
811+
require.Equal(t, "queue2", countsByQueue[1].Queue)
812+
require.Equal(t, int64(1), countsByQueue[1].CountAvailable)
813+
require.Equal(t, int64(1), countsByQueue[1].CountRunning)
814+
})
815+
816+
t.Run("InputQueueNamesAreDeduplicated", func(t *testing.T) {
817+
t.Parallel()
818+
819+
exec, _ := setup(ctx, t)
820+
821+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateAvailable)})
822+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue2"), State: ptrutil.Ptr(rivertype.JobStateRunning)})
823+
824+
countsByQueue, err := exec.JobCountByQueueAndState(ctx, &riverdriver.JobCountByQueueAndStateParams{
825+
QueueNames: []string{"queue2", "queue1", "queue1"},
826+
Schema: "",
827+
})
828+
require.NoError(t, err)
829+
830+
require.Len(t, countsByQueue, 2)
831+
832+
require.Equal(t, "queue1", countsByQueue[0].Queue)
833+
require.Equal(t, int64(0), countsByQueue[0].CountAvailable)
834+
require.Equal(t, int64(0), countsByQueue[0].CountRunning)
835+
836+
require.Equal(t, "queue2", countsByQueue[1].Queue)
837+
require.Equal(t, int64(1), countsByQueue[1].CountAvailable)
838+
require.Equal(t, int64(1), countsByQueue[1].CountRunning)
839+
})
790840
})
791841

792842
t.Run("JobCountByState", func(t *testing.T) {

riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ GROUP BY state;
8787

8888
-- name: JobCountByQueueAndState :many
8989
WITH all_queues AS (
90-
SELECT unnest(@queue_names::text[])::text AS queue
90+
SELECT DISTINCT unnest(@queue_names::text[])::text AS queue
9191
),
9292

9393
running_job_counts AS (

riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riversqlite/river_sqlite_driver.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,14 +295,46 @@ func (e *Executor) JobCountByQueueAndState(ctx context.Context, params *riverdri
295295
if err != nil {
296296
return nil, interpretError(err)
297297
}
298-
results := make([]*riverdriver.JobCountByQueueAndStateResult, len(rows))
299-
for i, row := range rows {
300-
results[i] = &riverdriver.JobCountByQueueAndStateResult{
298+
299+
// The PostgreSQL drivers implement this query with an `all_queues` CTE and
300+
// LEFT JOINs, so they return one row per requested queue, including queues
301+
// that currently have no jobs. The input queue list is deduplicated in SQL.
302+
// The SQLite sqlc driver only reliably supports `sqlc.slice` in `IN (...)`,
303+
// and we haven't found a workable way to bind a parameterized list through
304+
// `json_each(...)` to produce equivalent SQL. The SQLite SQL query therefore
305+
// returns only queues with matching rows, and this wrapper fills in missing
306+
// queues to match PostgreSQL behavior.
307+
countsByQueue := make(map[string]struct {
308+
CountAvailable int64
309+
CountRunning int64
310+
}, len(rows))
311+
for _, row := range rows {
312+
countsByQueue[row.Queue] = struct {
313+
CountAvailable int64
314+
CountRunning int64
315+
}{
301316
CountAvailable: row.CountAvailable,
302317
CountRunning: row.CountRunning,
303-
Queue: row.Queue,
304318
}
305319
}
320+
321+
queueNames := slices.Clone(params.QueueNames)
322+
slices.Sort(queueNames)
323+
queueNames = slices.Compact(queueNames)
324+
325+
results := make([]*riverdriver.JobCountByQueueAndStateResult, 0, len(queueNames))
326+
for _, queueName := range queueNames {
327+
result := &riverdriver.JobCountByQueueAndStateResult{
328+
Queue: queueName,
329+
}
330+
if counts, ok := countsByQueue[queueName]; ok {
331+
result.CountAvailable = counts.CountAvailable
332+
result.CountRunning = counts.CountRunning
333+
}
334+
335+
results = append(results, result)
336+
}
337+
306338
return results, nil
307339
}
308340

0 commit comments

Comments
 (0)