diff --git a/riverdriver/riverdrivertest/migration.go b/riverdriver/riverdrivertest/migration.go index 1a1c17c1..0bbd5ae5 100644 --- a/riverdriver/riverdrivertest/migration.go +++ b/riverdriver/riverdrivertest/migration.go @@ -2,6 +2,7 @@ package riverdrivertest import ( "context" + "fmt" "slices" "strings" "testing" @@ -144,6 +145,77 @@ func exerciseMigration[TTx any](ctx context.Context, t *testing.T, } }) + t.Run("MigrateDownFromVersionSevenWithJobData", func(t *testing.T) { + t.Parallel() + + driver, schema := driverWithSchema(ctx, t, &riverdbtest.TestSchemaOpts{ + DisableReuse: true, + }) + exec := driver.GetExecutor() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Schema: schema}) + + migrator, err := rivermigrate.New(driver, &rivermigrate.Config{ + Line: riverdriver.MigrationLineMain, + Logger: riversharedtest.Logger(t), + Schema: schema, + }) + require.NoError(t, err) + + _, err = migrator.Migrate(ctx, rivermigrate.DirectionDown, &rivermigrate.MigrateOpts{ + TargetVersion: 6, + }) + require.NoError(t, err) + + job, err = exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID, Schema: schema}) + require.NoError(t, err) + require.NotZero(t, job.ID) + }) + + t.Run("MigrateUpFromVersionSixWithQueueData", func(t *testing.T) { + t.Parallel() + + driver, schema := driverWithSchema(ctx, t, &riverdbtest.TestSchemaOpts{ + DisableReuse: true, + LineTargetVersions: map[string]int{ + riverdriver.MigrationLineMain: 6, + }, + }) + exec := driver.GetExecutor() + + queueTable := "river_queue" + if driver.DatabaseName() != riverdriver.DatabaseNameSQLite { + queueTable = schema + ".river_queue" + } + + _ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{ + Name: ptrutil.Ptr("default"), + Schema: schema, + }) + + migrator, err := rivermigrate.New(driver, &rivermigrate.Config{ + Line: riverdriver.MigrationLineMain, + Logger: riversharedtest.Logger(t), + Schema: schema, + }) + require.NoError(t, err) + + _, err = migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) + require.NoError(t, err) + + queue, err := exec.QueueGet(ctx, &riverdriver.QueueGetParams{Schema: schema, Name: "default"}) + require.NoError(t, err) + require.Equal(t, "default", queue.Name) + require.NotZero(t, queue.UpdatedAt) + + require.NoError(t, exec.Exec(ctx, fmt.Sprintf("INSERT INTO %s (name) VALUES ('queue-with-defaults')", queueTable))) + + queue, err = exec.QueueGet(ctx, &riverdriver.QueueGetParams{Schema: schema, Name: "queue-with-defaults"}) + require.NoError(t, err) + require.Equal(t, "queue-with-defaults", queue.Name) + require.NotZero(t, queue.UpdatedAt) + }) + type testBundle struct { driver riverdriver.Driver[TTx] } diff --git a/riverdriver/riversqlite/migration/main/007_notification_outbox_sqlite_jsonb_and_sql_cleanup.down.sql b/riverdriver/riversqlite/migration/main/007_notification_outbox_sqlite_jsonb_and_sql_cleanup.down.sql index e36cf848..24944ad8 100644 --- a/riverdriver/riversqlite/migration/main/007_notification_outbox_sqlite_jsonb_and_sql_cleanup.down.sql +++ b/riverdriver/riversqlite/migration/main/007_notification_outbox_sqlite_jsonb_and_sql_cleanup.down.sql @@ -30,43 +30,12 @@ CREATE TABLE /* TEMPLATE: schema */river_client_queue ( CONSTRAINT num_jobs_running_zero_or_positive CHECK (num_jobs_running >= 0) ); --- --- Revert addition of `DEFAULT 25` to `river_job.max_attempts`. --- - -ALTER TABLE /* TEMPLATE: schema */river_job - RENAME COLUMN max_attempts TO max_attempts_old; - -ALTER TABLE /* TEMPLATE: schema */river_job - ADD COLUMN max_attempts integer NOT NULL; - -UPDATE /* TEMPLATE: schema */river_job -SET max_attempts = max_attempts_old; - -ALTER TABLE /* TEMPLATE: schema */river_job - DROP COLUMN max_attempts_old; - --- --- Changes `river_queue.updated_at` to revert the default of `CURRENT_TIMESTAMP`. --- - -ALTER TABLE /* TEMPLATE: schema */river_queue - RENAME COLUMN updated_at TO updated_at_old; - -ALTER TABLE /* TEMPLATE: schema */river_queue - ADD COLUMN updated_at timestamp NOT NULL; - -UPDATE /* TEMPLATE: schema */river_queue -SET updated_at = updated_at_old; - -ALTER TABLE /* TEMPLATE: schema */river_queue - DROP COLUMN updated_at_old; - -- -- SQLite JSONB conversion rollback. -- -- Convert JSONB binary columns back to JSON text format and restore json() --- defaults. +-- defaults. The `river_job` rebuild also reverts the addition of `DEFAULT 25` +-- to `river_job.max_attempts`. -- -- SQLite doesn't allow `ALTER TABLE ADD COLUMN` with non-constant defaults like -- `json('{}')`, so rebuild each affected table instead. diff --git a/riverdriver/riversqlite/migration/main/007_notification_outbox_sqlite_jsonb_and_sql_cleanup.up.sql b/riverdriver/riversqlite/migration/main/007_notification_outbox_sqlite_jsonb_and_sql_cleanup.up.sql index aad44165..511aedeb 100644 --- a/riverdriver/riversqlite/migration/main/007_notification_outbox_sqlite_jsonb_and_sql_cleanup.up.sql +++ b/riverdriver/riversqlite/migration/main/007_notification_outbox_sqlite_jsonb_and_sql_cleanup.up.sql @@ -135,7 +135,7 @@ CREATE TABLE /* TEMPLATE: schema */river_queue ( created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, metadata blob NOT NULL DEFAULT (jsonb('{}')), paused_at timestamp, - updated_at timestamp NOT NULL + updated_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ); INSERT INTO /* TEMPLATE: schema */river_queue ( @@ -259,19 +259,3 @@ SET max_attempts = max_attempts_old; ALTER TABLE /* TEMPLATE: schema */river_job DROP COLUMN max_attempts_old; - --- --- Changes `river_queue.updated_at` to have a default of `CURRENT_TIMESTAMP`. --- - -ALTER TABLE /* TEMPLATE: schema */river_queue - RENAME COLUMN updated_at TO updated_at_old; - -ALTER TABLE /* TEMPLATE: schema */river_queue - ADD COLUMN updated_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP; - -UPDATE /* TEMPLATE: schema */river_queue -SET updated_at = updated_at_old; - -ALTER TABLE /* TEMPLATE: schema */river_queue - DROP COLUMN updated_at_old;