Skip to content

Commit cb8347d

Browse files
committed
split riverdrivertest.go
Instead of a giant 5000 line file, split into smaller files grouped by entity. For job queries where we have many tests, break them into files by query type.
1 parent a8d8127 commit cb8347d

12 files changed

Lines changed: 5090 additions & 4757 deletions
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
package riverdrivertest
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
9+
"github.com/riverqueue/river/riverdbtest"
10+
"github.com/riverqueue/river/riverdriver"
11+
"github.com/riverqueue/river/rivershared/testfactory"
12+
"github.com/riverqueue/river/rivershared/util/hashutil"
13+
"github.com/riverqueue/river/rivershared/util/randutil"
14+
"github.com/riverqueue/river/rivertype"
15+
)
16+
17+
func exerciseExecutorTx[TTx any](ctx context.Context, t *testing.T,
18+
driverWithSchema func(ctx context.Context, t *testing.T, opts *riverdbtest.TestSchemaOpts) (riverdriver.Driver[TTx], string),
19+
executorWithTx func(ctx context.Context, t *testing.T) (riverdriver.Executor, riverdriver.Driver[TTx]),
20+
) {
21+
t.Helper()
22+
23+
setup := func(ctx context.Context, t *testing.T) riverdriver.Executor {
24+
t.Helper()
25+
26+
exec, _ := executorWithTx(ctx, t)
27+
return exec
28+
}
29+
30+
t.Run("Begin", func(t *testing.T) {
31+
t.Parallel()
32+
33+
t.Run("BasicVisibility", func(t *testing.T) {
34+
t.Parallel()
35+
36+
exec := setup(ctx, t)
37+
38+
tx, err := exec.Begin(ctx)
39+
require.NoError(t, err)
40+
t.Cleanup(func() { _ = tx.Rollback(ctx) })
41+
42+
// Job visible in subtransaction, but not parent.
43+
{
44+
job := testfactory.Job(ctx, t, tx, &testfactory.JobOpts{})
45+
_ = testfactory.Job(ctx, t, tx, &testfactory.JobOpts{})
46+
47+
_, err := tx.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID})
48+
require.NoError(t, err)
49+
50+
require.NoError(t, tx.Rollback(ctx))
51+
52+
_, err = exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID})
53+
require.ErrorIs(t, err, rivertype.ErrNotFound)
54+
}
55+
})
56+
57+
t.Run("NestedTransactions", func(t *testing.T) {
58+
t.Parallel()
59+
60+
exec := setup(ctx, t)
61+
62+
tx1, err := exec.Begin(ctx)
63+
require.NoError(t, err)
64+
t.Cleanup(func() { _ = tx1.Rollback(ctx) })
65+
66+
// Job visible in tx1, but not top level executor.
67+
{
68+
job1 := testfactory.Job(ctx, t, tx1, &testfactory.JobOpts{})
69+
70+
{
71+
tx2, err := tx1.Begin(ctx)
72+
require.NoError(t, err)
73+
t.Cleanup(func() { _ = tx2.Rollback(ctx) })
74+
75+
// Job visible in tx2, but not top level executor.
76+
{
77+
job2 := testfactory.Job(ctx, t, tx2, &testfactory.JobOpts{})
78+
79+
_, err := tx2.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job2.ID})
80+
require.NoError(t, err)
81+
82+
require.NoError(t, tx2.Rollback(ctx))
83+
84+
_, err = tx1.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job2.ID})
85+
require.ErrorIs(t, err, rivertype.ErrNotFound)
86+
}
87+
88+
_, err = tx1.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job1.ID})
89+
require.NoError(t, err)
90+
}
91+
92+
// Repeat the same subtransaction again.
93+
{
94+
tx2, err := tx1.Begin(ctx)
95+
require.NoError(t, err)
96+
t.Cleanup(func() { _ = tx2.Rollback(ctx) })
97+
98+
job2 := testfactory.Job(ctx, t, tx2, &testfactory.JobOpts{})
99+
100+
_, err = tx2.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job2.ID})
101+
require.NoError(t, err)
102+
103+
require.NoError(t, tx2.Rollback(ctx))
104+
105+
_, err = tx1.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job2.ID})
106+
require.ErrorIs(t, err, rivertype.ErrNotFound)
107+
}
108+
109+
_, err = tx1.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job1.ID})
110+
require.NoError(t, err)
111+
}
112+
})
113+
114+
t.Run("RollbackAfterCommit", func(t *testing.T) {
115+
t.Parallel()
116+
117+
exec := setup(ctx, t)
118+
119+
tx1, err := exec.Begin(ctx)
120+
require.NoError(t, err)
121+
t.Cleanup(func() { _ = tx1.Rollback(ctx) })
122+
123+
tx2, err := tx1.Begin(ctx)
124+
require.NoError(t, err)
125+
t.Cleanup(func() { _ = tx2.Rollback(ctx) })
126+
127+
job := testfactory.Job(ctx, t, tx2, &testfactory.JobOpts{})
128+
129+
require.NoError(t, tx2.Commit(ctx))
130+
_ = tx2.Rollback(ctx) // "tx is closed" error generally returned, but don't require this
131+
132+
// Despite rollback being called after commit, the job is still
133+
// visible from the outer transaction.
134+
_, err = tx1.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID})
135+
require.NoError(t, err)
136+
})
137+
})
138+
139+
t.Run("Exec", func(t *testing.T) {
140+
t.Parallel()
141+
142+
t.Run("NoArgs", func(t *testing.T) {
143+
t.Parallel()
144+
145+
exec := setup(ctx, t)
146+
147+
require.NoError(t, exec.Exec(ctx, "SELECT 1 + 2"))
148+
})
149+
150+
t.Run("WithArgs", func(t *testing.T) {
151+
t.Parallel()
152+
153+
exec := setup(ctx, t)
154+
155+
require.NoError(t, exec.Exec(ctx, "SELECT $1 || $2", "foo", "bar"))
156+
})
157+
})
158+
159+
t.Run("PGAdvisoryXactLock", func(t *testing.T) {
160+
t.Parallel()
161+
162+
{
163+
driver, _ := driverWithSchema(ctx, t, nil)
164+
if driver.DatabaseName() == databaseNameSQLite {
165+
t.Logf("Skipping PGAdvisoryXactLock test for SQLite")
166+
return
167+
}
168+
}
169+
170+
exec := setup(ctx, t)
171+
172+
// It's possible for multiple versions of this test to be running at the
173+
// same time (from different drivers), so make sure the lock we're
174+
// acquiring per test is unique by using the complete test name. Also
175+
// add randomness in case a test is run multiple times with `-count`.
176+
lockHash := hashutil.NewAdvisoryLockHash(0)
177+
lockHash.Write([]byte(t.Name()))
178+
lockHash.Write([]byte(randutil.Hex(10)))
179+
key := lockHash.Key()
180+
181+
// Tries to acquire the given lock from another test transaction and
182+
// returns true if the lock was acquired.
183+
tryAcquireLock := func(exec riverdriver.Executor) bool {
184+
var lockAcquired bool
185+
require.NoError(t, exec.QueryRow(ctx, "SELECT pg_try_advisory_lock($1)", key).Scan(&lockAcquired))
186+
return lockAcquired
187+
}
188+
189+
// Start a transaction to acquire the lock so we can later release the
190+
// lock by rolling back.
191+
execTx, err := exec.Begin(ctx)
192+
require.NoError(t, err)
193+
194+
// Acquire the advisory lock on the main test transaction.
195+
_, err = execTx.PGAdvisoryXactLock(ctx, key)
196+
require.NoError(t, err)
197+
198+
// Start another test transaction unrelated to the first.
199+
otherExec, _ := executorWithTx(ctx, t)
200+
201+
// The other test transaction is unable to acquire the lock because the
202+
// first test transaction holds it.
203+
require.False(t, tryAcquireLock(otherExec))
204+
205+
// Roll back the first test transaction to release the lock.
206+
require.NoError(t, execTx.Rollback(ctx))
207+
208+
// The other test transaction can now acquire the lock.
209+
require.True(t, tryAcquireLock(otherExec))
210+
})
211+
212+
t.Run("QueryRow", func(t *testing.T) {
213+
t.Parallel()
214+
215+
exec := setup(ctx, t)
216+
217+
var (
218+
field1 int
219+
field2 int
220+
field3 int
221+
fieldFoo string
222+
)
223+
224+
err := exec.QueryRow(ctx, "SELECT 1, 2, 3, 'foo'").Scan(&field1, &field2, &field3, &fieldFoo)
225+
require.NoError(t, err)
226+
227+
require.Equal(t, 1, field1)
228+
require.Equal(t, 2, field2)
229+
require.Equal(t, 3, field3)
230+
require.Equal(t, "foo", fieldFoo)
231+
})
232+
}

0 commit comments

Comments
 (0)