diff --git a/cmd/wfctl/dsl-reference-embedded.md b/cmd/wfctl/dsl-reference-embedded.md index 874b89a9..e0ae16d5 100644 --- a/cmd/wfctl/dsl-reference-embedded.md +++ b/cmd/wfctl/dsl-reference-embedded.md @@ -1322,6 +1322,7 @@ Manage SQLite-backed engine config schema migrations. - `wfctl config migrate apply [--db workflow.db]` — apply pending migrations - `wfctl config migrate plugins [--config workflow.yaml]` — migrate requires.plugins[] entries - `wfctl config migrate repair-dirty [options]` — repair dirty golang-migrate metadata +- `wfctl migrations up --config app.yaml --env prod` — apply application database migrations through the configured migration plugin and verify post-up status is clean > **Deprecated:** `wfctl migrate` is an alias for `wfctl config migrate` and will be removed in v0.21+. > Update scripts to use `wfctl config migrate`. diff --git a/cmd/wfctl/migrations.go b/cmd/wfctl/migrations.go index da95908a..414fc225 100644 --- a/cmd/wfctl/migrations.go +++ b/cmd/wfctl/migrations.go @@ -55,13 +55,15 @@ type migrationRepairDirtyResult struct { func runMigrations(args []string) error { if len(args) == 0 { - return fmt.Errorf("usage: wfctl migrations ") + return fmt.Errorf("usage: wfctl migrations ") } switch args[0] { case "validate": return runMigrationsValidate(args[1:]) case "status": return runMigrationsStatus(args[1:]) + case "up": + return runMigrationsUp(args[1:]) case "ci-check": return runMigrationsCICheck(args[1:]) case "repair-dirty": @@ -107,6 +109,38 @@ func runMigrationsStatus(args []string) error { return nil } +func runMigrationsUp(args []string) error { + fs := flag.NewFlagSet("migrations up", flag.ContinueOnError) + configFile := fs.String("config", "app.yaml", "Workflow config file") + fs.StringVar(configFile, "c", "app.yaml", "Config file (short for --config)") + envName := fs.String("env", "", "Environment name") + pluginDir := fs.String("plugin-dir", defaultMigrationPluginDir(), "Plugin directory") + format := fs.String("format", "text", "Output format: text or json") + if err := fs.Parse(args); err != nil { + return err + } + + cfg, err := loadMigrationWorkflowConfig(*configFile) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + migrations, err := resolveMigrationConfigs(cfg, *envName) + if err != nil { + return err + } + result, err := applyMigrationsForConfigs(context.Background(), migrations, *pluginDir) + if writeErr := writeMigrationStatusOutput(result, *format, "migrations up"); writeErr != nil { + return writeErr + } + if err != nil { + return err + } + if result.Decision == "fail" { + return errors.New(strings.Join(result.Reasons, "; ")) + } + return nil +} + func runMigrationsCICheck(args []string) error { fs := flag.NewFlagSet("migrations ci-check", flag.ContinueOnError) configFile := fs.String("config", "app.yaml", "Workflow config file") @@ -567,6 +601,66 @@ func collectMigrationStatusForConfigs(ctx context.Context, migrations []resolved return result, errors.Join(errs...) } +func applyMigrationsForConfigs(ctx context.Context, migrations []resolvedMigrationConfig, pluginDir string) (migrationStatusResult, error) { + result := migrationStatusResult{ + Decision: "pass", + Destructive: false, + HumanApprovalRequired: false, + Migrations: make([]migrationValidationRecord, 0, len(migrations)), + } + runner := newMigrationPluginRunner() + var errs []error + for _, migration := range migrations { + runCfg := migrationPluginRunConfig{ + Plugin: migration.Plugin, + PluginDir: pluginDir, + Driver: migration.Driver, + SourceDir: migration.SourceDir, + DSN: migration.DSN, + } + record := migrationValidationRecord{Name: migration.Name, Driver: migration.Driver} + if _, err := runner.run(ctx, runCfg, "up"); err != nil { + reason := fmt.Sprintf("migration %s up failed: %s", migration.Name, redactMigrationDSN(err.Error(), migration.DSN)) + result.Reasons = append(result.Reasons, reason) + record.Error = reason + result.Migrations = append(result.Migrations, record) + errs = append(errs, errors.New(reason)) + continue + } + statusOutput, err := runner.run(ctx, runCfg, "status") + if err != nil { + reason := fmt.Sprintf("migration %s post-up status failed: %s", migration.Name, redactMigrationDSN(err.Error(), migration.DSN)) + result.Reasons = append(result.Reasons, reason) + record.Error = reason + result.Migrations = append(result.Migrations, record) + errs = append(errs, errors.New(reason)) + continue + } + status, err := parseMigrationStatus(statusOutput.Stdout) + if err != nil { + reason := fmt.Sprintf("migration %s post-up status failed: %s", migration.Name, redactMigrationDSN(err.Error(), migration.DSN)) + result.Reasons = append(result.Reasons, reason) + record.Error = reason + result.Migrations = append(result.Migrations, record) + errs = append(errs, errors.New(reason)) + continue + } + record.Current = status.Current + record.Dirty = status.Dirty + record.Pending = status.Pending + for _, reason := range migrationStatusCleanReasons([]migrationValidationRecord{record}) { + reason = strings.Replace(reason, " is dirty ", " is dirty after up ", 1) + result.Reasons = append(result.Reasons, reason) + errs = append(errs, errors.New(reason)) + } + result.Migrations = append(result.Migrations, record) + } + if len(result.Reasons) > 0 { + result.Decision = "fail" + } + return result, errors.Join(errs...) +} + func migrationCurrentOrUnknown(current string) string { current = strings.TrimSpace(current) if current == "" { diff --git a/cmd/wfctl/migrations_baseline_test.go b/cmd/wfctl/migrations_baseline_test.go index c0747941..10f8438e 100644 --- a/cmd/wfctl/migrations_baseline_test.go +++ b/cmd/wfctl/migrations_baseline_test.go @@ -651,14 +651,21 @@ func stubMigrationBaselineHooks(t *testing.T, calls *[]string, changedFiles []st } func migrationCommandFromArgs(args []string) string { - if len(args) < 2 { + if len(args) == 0 { return "" } - if args[1] == "lint" { + offset := 0 + if args[0] == "--wfctl-cli" { + offset = 1 + } + if len(args) <= offset { + return "" + } + if args[offset] == "lint" { return "lint" } - command := []string{args[1]} - for i := 2; i < len(args); i++ { + command := []string{args[offset]} + for i := offset + 1; i < len(args); i++ { if args[i] == "--driver" || args[i] == "--source-dir" || args[i] == "--dsn" { break } @@ -668,8 +675,12 @@ func migrationCommandFromArgs(args []string) string { } func migrationSourceFromArgs(args []string) string { - if len(args) >= 3 && args[1] == "lint" { - return args[2] + offset := 0 + if len(args) > 0 && args[0] == "--wfctl-cli" { + offset = 1 + } + if len(args) > offset+1 && args[offset] == "lint" { + return args[offset+1] } return argValue(args, "--source-dir") } diff --git a/cmd/wfctl/migrations_plugin_runner.go b/cmd/wfctl/migrations_plugin_runner.go index dd70c5ce..8af24307 100644 --- a/cmd/wfctl/migrations_plugin_runner.go +++ b/cmd/wfctl/migrations_plugin_runner.go @@ -61,8 +61,7 @@ func (r migrationPluginRunner) runPluginArgs(ctx context.Context, cfg migrationP } func buildMigrationPluginArgs(cfg migrationPluginRunConfig, commandArgs []string) []string { - args := []string{"--wfctl-cli"} - args = append(args, commandArgs...) + args := append([]string(nil), commandArgs...) args = append(args, "--driver", cfg.Driver, "--source-dir", cfg.SourceDir, @@ -71,7 +70,7 @@ func buildMigrationPluginArgs(cfg migrationPluginRunConfig, commandArgs []string } func buildMigrationPluginLintArgs(cfg migrationPluginRunConfig) []string { - return []string{"--wfctl-cli", "lint", cfg.SourceDir} + return []string{"lint", cfg.SourceDir} } func buildMigrationPluginEnv(cfg migrationPluginRunConfig) map[string]string { diff --git a/cmd/wfctl/migrations_plugin_runner_test.go b/cmd/wfctl/migrations_plugin_runner_test.go index 7e433512..dc1c5a8a 100644 --- a/cmd/wfctl/migrations_plugin_runner_test.go +++ b/cmd/wfctl/migrations_plugin_runner_test.go @@ -41,7 +41,6 @@ func TestMigrationPluginRunnerBuildsWorkflowMigrateArgs(t *testing.T) { t.Fatalf("plugin = %q", gotPlugin) } wantArgs := []string{ - "--wfctl-cli", "test", "--driver", "golang-migrate", @@ -61,7 +60,7 @@ func TestBuildMigrationPluginArgsMatchPluginRootContract(t *testing.T) { Driver: "golang-migrate", SourceDir: "migrations", }, []string{"status"}) - want := []string{"--wfctl-cli", "status", "--driver", "golang-migrate", "--source-dir", "migrations"} + want := []string{"status", "--driver", "golang-migrate", "--source-dir", "migrations"} if !reflect.DeepEqual(got, want) { t.Fatalf("args = %#v, want %#v", got, want) } @@ -69,7 +68,7 @@ func TestBuildMigrationPluginArgsMatchPluginRootContract(t *testing.T) { func TestBuildMigrationPluginLintArgsMatchPluginContract(t *testing.T) { got := buildMigrationPluginLintArgs(migrationPluginRunConfig{SourceDir: "migrations"}) - want := []string{"--wfctl-cli", "lint", "migrations"} + want := []string{"lint", "migrations"} if !reflect.DeepEqual(got, want) { t.Fatalf("args = %#v, want %#v", got, want) } @@ -122,7 +121,7 @@ func TestDefaultMigrationPluginExecutorDoesNotInheritProcessSecrets(t *testing.T } t.Setenv("LEAKED_CI_SECRET", "must-not-reach-plugin") - _, err := defaultMigrationPluginExecutor(context.Background(), "workflow-plugin-migrations", []string{"--wfctl-cli", "migrate", "status"}, map[string]string{ + _, err := defaultMigrationPluginExecutor(context.Background(), "workflow-plugin-migrations", []string{"status", "--driver", "golang-migrate", "--source-dir", "migrations"}, map[string]string{ "DATABASE_URL": "postgres://user:secret@example.com/app", "WFCTL_PLUGIN_DIR": root, "PLUGIN_PUBLIC_VAR": "ok", diff --git a/cmd/wfctl/migrations_status_test.go b/cmd/wfctl/migrations_status_test.go index a304dd60..1bf485aa 100644 --- a/cmd/wfctl/migrations_status_test.go +++ b/cmd/wfctl/migrations_status_test.go @@ -186,7 +186,7 @@ func stubMigrationStatusRunner(t *testing.T, result migrationCommandResult, runE if pluginName != "workflow-plugin-migrations" { t.Fatalf("pluginName = %q", pluginName) } - if got := strings.Join(args, " "); !strings.Contains(got, "--wfctl-cli status") { + if got := strings.Join(args, " "); !strings.HasPrefix(got, "status ") { t.Fatalf("args = %q, want status", got) } if env["DATABASE_URL"] != "postgres://secret@example/db" { diff --git a/cmd/wfctl/migrations_up_test.go b/cmd/wfctl/migrations_up_test.go new file mode 100644 index 00000000..22e98ea5 --- /dev/null +++ b/cmd/wfctl/migrations_up_test.go @@ -0,0 +1,120 @@ +package main + +import ( + "context" + "encoding/json" + "reflect" + "strings" + "testing" +) + +func TestRunMigrationsUpAppliesAndVerifiesCleanStatus(t *testing.T) { + cfgPath := writeMigrationStatusConfig(t) + t.Setenv("DATABASE_URL", "postgres://secret@example/db") + var calls []string + restore := stubMigrationUpRunner(t, &calls, []migrationCommandResult{ + {}, + {Stdout: "Current: 20260426000002\nDirty: false\nNo pending migrations.\n"}, + }) + defer restore() + + if err := runMigrations([]string{"up", "--config", cfgPath, "--env", "prod"}); err != nil { + t.Fatal(err) + } + + want := []string{"up", "status"} + if !reflect.DeepEqual(calls, want) { + t.Fatalf("calls = %#v, want %#v", calls, want) + } +} + +func TestRunMigrationsUpFailsWhenPostStatusIsDirty(t *testing.T) { + cfgPath := writeMigrationStatusConfig(t) + t.Setenv("DATABASE_URL", "postgres://secret@example/db") + var calls []string + restore := stubMigrationUpRunner(t, &calls, []migrationCommandResult{ + {}, + {Stdout: "Current: 20260426000002\nDirty: true\nNo pending migrations.\n"}, + }) + defer restore() + + err := runMigrations([]string{"up", "--config", cfgPath, "--env", "prod"}) + if err == nil { + t.Fatal("runMigrations() error = nil; want dirty post-up failure") + } + if !strings.Contains(err.Error(), "dirty after up") { + t.Fatalf("runMigrations() error = %v; want dirty after up", err) + } +} + +func TestRunMigrationsUpFailsWhenPostStatusHasPendingMigrations(t *testing.T) { + cfgPath := writeMigrationStatusConfig(t) + t.Setenv("DATABASE_URL", "postgres://secret@example/db") + var calls []string + restore := stubMigrationUpRunner(t, &calls, []migrationCommandResult{ + {}, + {Stdout: "Current: 20260426000002\nDirty: false\nPending: 20260426000003\n"}, + }) + defer restore() + + err := runMigrations([]string{"up", "--config", cfgPath, "--env", "prod"}) + if err == nil { + t.Fatal("runMigrations() error = nil; want pending migration failure") + } + if !strings.Contains(err.Error(), "pending migrations") { + t.Fatalf("runMigrations() error = %v; want pending migrations", err) + } +} + +func TestRunMigrationsUpJSONOutputRedactsDSN(t *testing.T) { + cfgPath := writeMigrationStatusConfig(t) + t.Setenv("DATABASE_URL", "postgres://secret@example/db") + var calls []string + restore := stubMigrationUpRunner(t, &calls, []migrationCommandResult{ + {}, + {Stdout: "Current: 20260426000002\nDirty: false\nNo pending migrations.\n"}, + }) + defer restore() + + out, err := captureStdout(t, func() error { + return runMigrations([]string{"up", "--config", cfgPath, "--env", "prod", "--format", "json"}) + }) + if err != nil { + t.Fatal(err) + } + if strings.Contains(out, "postgres://secret@example/db") { + t.Fatalf("output leaked DSN: %s", out) + } + var got migrationStatusResult + if err := json.Unmarshal([]byte(out), &got); err != nil { + t.Fatalf("decode JSON: %v\n%s", err, out) + } + if got.Decision != "pass" || len(got.Migrations) != 1 || got.Migrations[0].Current != "20260426000002" { + t.Fatalf("unexpected result: %+v", got) + } + if len(got.Migrations[0].Pending) != 0 { + t.Fatalf("pending = %#v", got.Migrations[0].Pending) + } +} + +func stubMigrationUpRunner(t *testing.T, calls *[]string, results []migrationCommandResult) func() { + t.Helper() + oldFactory := newMigrationPluginRunner + newMigrationPluginRunner = func() migrationPluginRunner { + return migrationPluginRunner{ + exec: func(_ context.Context, _ string, args []string, env map[string]string) (migrationCommandResult, error) { + if env["DATABASE_URL"] != "postgres://secret@example/db" { + t.Fatalf("DATABASE_URL = %q", env["DATABASE_URL"]) + } + *calls = append(*calls, migrationCommandFromArgs(args)) + if len(results) == 0 { + return migrationCommandResult{}, nil + } + result := results[0] + results = results[1:] + return result, nil + }, + } + } + return func() { newMigrationPluginRunner = oldFactory } +} diff --git a/cmd/wfctl/migrations_validate_test.go b/cmd/wfctl/migrations_validate_test.go index 71bb0d20..1549ee86 100644 --- a/cmd/wfctl/migrations_validate_test.go +++ b/cmd/wfctl/migrations_validate_test.go @@ -34,7 +34,7 @@ func TestRunMigrationsValidateRunsLintAndFreshCycle(t *testing.T) { if strings.Contains(strings.Join(args, " "), " test ") && env["DATABASE_URL"] != "postgres://ephemeral/app-fresh" { t.Fatalf("fresh cycle used DATABASE_URL = %q", env["DATABASE_URL"]) } - if strings.Contains(strings.Join(args, " "), " lint ") && env["DATABASE_URL"] != "" { + if len(args) > 0 && args[0] == "lint" && env["DATABASE_URL"] != "" { t.Fatalf("lint received DATABASE_URL = %q", env["DATABASE_URL"]) } return migrationCommandResult{Stdout: `{"current":"202604270001","dirty":false,"pending":[]}`}, nil @@ -49,9 +49,9 @@ func TestRunMigrationsValidateRunsLintAndFreshCycle(t *testing.T) { } want := []string{ - "workflow-plugin-migrations --wfctl-cli lint migrations ", + "workflow-plugin-migrations lint migrations ", "ephemeral app-fresh postgres://secret@example/db", - "workflow-plugin-migrations --wfctl-cli test --driver golang-migrate --source-dir migrations postgres://ephemeral/app-fresh", + "workflow-plugin-migrations test --driver golang-migrate --source-dir migrations postgres://ephemeral/app-fresh", "cleanup ephemeral app-fresh", } if !reflect.DeepEqual(calls, want) { @@ -122,7 +122,7 @@ func TestRunMigrationsValidateJSONOutputOnFailure(t *testing.T) { newMigrationPluginRunner = func() migrationPluginRunner { return migrationPluginRunner{ exec: func(_ context.Context, _ string, args []string, _ map[string]string) (migrationCommandResult, error) { - if strings.Contains(strings.Join(args, " "), " lint ") { + if len(args) > 0 && args[0] == "lint" { return migrationCommandResult{}, errors.New("lint failed for postgres://secret@example/db") } return migrationCommandResult{}, nil