Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/wfctl/dsl-reference-embedded.md
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,7 @@ Manage SQLite-backed engine config schema migrations.
- `wfctl config migrate apply [--db workflow.db]` — apply pending migrations
- `wfctl config migrate plugins [--config workflow.yaml]` — migrate requires.plugins[] entries
- `wfctl config migrate repair-dirty [options]` — repair dirty golang-migrate metadata
- `wfctl migrations up --config app.yaml --env prod` — apply application database migrations through the configured migration plugin and verify post-up status is clean

> **Deprecated:** `wfctl migrate` is an alias for `wfctl config migrate` and will be removed in v0.21+.
> Update scripts to use `wfctl config migrate`.
Expand Down
96 changes: 95 additions & 1 deletion cmd/wfctl/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ type migrationRepairDirtyResult struct {

func runMigrations(args []string) error {
if len(args) == 0 {
return fmt.Errorf("usage: wfctl migrations <validate|status|ci-check|repair-dirty>")
return fmt.Errorf("usage: wfctl migrations <validate|status|up|ci-check|repair-dirty>")
}
switch args[0] {
case "validate":
return runMigrationsValidate(args[1:])
case "status":
return runMigrationsStatus(args[1:])
case "up":
return runMigrationsUp(args[1:])
case "ci-check":
return runMigrationsCICheck(args[1:])
case "repair-dirty":
Expand Down Expand Up @@ -107,6 +109,38 @@ func runMigrationsStatus(args []string) error {
return nil
}

func runMigrationsUp(args []string) error {
fs := flag.NewFlagSet("migrations up", flag.ContinueOnError)
configFile := fs.String("config", "app.yaml", "Workflow config file")
fs.StringVar(configFile, "c", "app.yaml", "Config file (short for --config)")
envName := fs.String("env", "", "Environment name")
pluginDir := fs.String("plugin-dir", defaultMigrationPluginDir(), "Plugin directory")
format := fs.String("format", "text", "Output format: text or json")
if err := fs.Parse(args); err != nil {
return err
}

cfg, err := loadMigrationWorkflowConfig(*configFile)
if err != nil {
return fmt.Errorf("load config: %w", err)
}
migrations, err := resolveMigrationConfigs(cfg, *envName)
if err != nil {
return err
}
result, err := applyMigrationsForConfigs(context.Background(), migrations, *pluginDir)
if writeErr := writeMigrationStatusOutput(result, *format, "migrations up"); writeErr != nil {
return writeErr
}
if err != nil {
return err
}
if result.Decision == "fail" {
return errors.New(strings.Join(result.Reasons, "; "))
}
return nil
}

func runMigrationsCICheck(args []string) error {
fs := flag.NewFlagSet("migrations ci-check", flag.ContinueOnError)
configFile := fs.String("config", "app.yaml", "Workflow config file")
Expand Down Expand Up @@ -567,6 +601,66 @@ func collectMigrationStatusForConfigs(ctx context.Context, migrations []resolved
return result, errors.Join(errs...)
}

func applyMigrationsForConfigs(ctx context.Context, migrations []resolvedMigrationConfig, pluginDir string) (migrationStatusResult, error) {
result := migrationStatusResult{
Decision: "pass",
Destructive: false,
HumanApprovalRequired: false,
Migrations: make([]migrationValidationRecord, 0, len(migrations)),
}
runner := newMigrationPluginRunner()
var errs []error
for _, migration := range migrations {
runCfg := migrationPluginRunConfig{
Plugin: migration.Plugin,
PluginDir: pluginDir,
Driver: migration.Driver,
SourceDir: migration.SourceDir,
DSN: migration.DSN,
}
record := migrationValidationRecord{Name: migration.Name, Driver: migration.Driver}
if _, err := runner.run(ctx, runCfg, "up"); err != nil {
reason := fmt.Sprintf("migration %s up failed: %s", migration.Name, redactMigrationDSN(err.Error(), migration.DSN))
result.Reasons = append(result.Reasons, reason)
record.Error = reason
result.Migrations = append(result.Migrations, record)
errs = append(errs, errors.New(reason))
continue
}
statusOutput, err := runner.run(ctx, runCfg, "status")
if err != nil {
reason := fmt.Sprintf("migration %s post-up status failed: %s", migration.Name, redactMigrationDSN(err.Error(), migration.DSN))
result.Reasons = append(result.Reasons, reason)
record.Error = reason
result.Migrations = append(result.Migrations, record)
errs = append(errs, errors.New(reason))
continue
}
status, err := parseMigrationStatus(statusOutput.Stdout)
if err != nil {
reason := fmt.Sprintf("migration %s post-up status failed: %s", migration.Name, redactMigrationDSN(err.Error(), migration.DSN))
result.Reasons = append(result.Reasons, reason)
record.Error = reason
result.Migrations = append(result.Migrations, record)
errs = append(errs, errors.New(reason))
continue
}
record.Current = status.Current
record.Dirty = status.Dirty
record.Pending = status.Pending
for _, reason := range migrationStatusCleanReasons([]migrationValidationRecord{record}) {
reason = strings.Replace(reason, " is dirty ", " is dirty after up ", 1)
result.Reasons = append(result.Reasons, reason)
errs = append(errs, errors.New(reason))
}
result.Migrations = append(result.Migrations, record)
}
if len(result.Reasons) > 0 {
result.Decision = "fail"
}
return result, errors.Join(errs...)
}

func migrationCurrentOrUnknown(current string) string {
current = strings.TrimSpace(current)
if current == "" {
Expand Down
23 changes: 17 additions & 6 deletions cmd/wfctl/migrations_baseline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,14 +651,21 @@ func stubMigrationBaselineHooks(t *testing.T, calls *[]string, changedFiles []st
}

func migrationCommandFromArgs(args []string) string {
if len(args) < 2 {
if len(args) == 0 {
return ""
}
if args[1] == "lint" {
offset := 0
if args[0] == "--wfctl-cli" {
offset = 1
}
if len(args) <= offset {
return ""
}
if args[offset] == "lint" {
return "lint"
}
command := []string{args[1]}
for i := 2; i < len(args); i++ {
command := []string{args[offset]}
for i := offset + 1; i < len(args); i++ {
if args[i] == "--driver" || args[i] == "--source-dir" || args[i] == "--dsn" {
break
}
Expand All @@ -668,8 +675,12 @@ func migrationCommandFromArgs(args []string) string {
}

func migrationSourceFromArgs(args []string) string {
if len(args) >= 3 && args[1] == "lint" {
return args[2]
offset := 0
if len(args) > 0 && args[0] == "--wfctl-cli" {
offset = 1
}
if len(args) > offset+1 && args[offset] == "lint" {
return args[offset+1]
}
return argValue(args, "--source-dir")
}
Expand Down
5 changes: 2 additions & 3 deletions cmd/wfctl/migrations_plugin_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ func (r migrationPluginRunner) runPluginArgs(ctx context.Context, cfg migrationP
}

func buildMigrationPluginArgs(cfg migrationPluginRunConfig, commandArgs []string) []string {
args := []string{"--wfctl-cli"}
args = append(args, commandArgs...)
args := append([]string(nil), commandArgs...)
args = append(args,
"--driver", cfg.Driver,
"--source-dir", cfg.SourceDir,
Expand All @@ -71,7 +70,7 @@ func buildMigrationPluginArgs(cfg migrationPluginRunConfig, commandArgs []string
}

func buildMigrationPluginLintArgs(cfg migrationPluginRunConfig) []string {
return []string{"--wfctl-cli", "lint", cfg.SourceDir}
return []string{"lint", cfg.SourceDir}
}

func buildMigrationPluginEnv(cfg migrationPluginRunConfig) map[string]string {
Expand Down
7 changes: 3 additions & 4 deletions cmd/wfctl/migrations_plugin_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func TestMigrationPluginRunnerBuildsWorkflowMigrateArgs(t *testing.T) {
t.Fatalf("plugin = %q", gotPlugin)
}
wantArgs := []string{
"--wfctl-cli",
"test",
"--driver",
"golang-migrate",
Expand All @@ -61,15 +60,15 @@ func TestBuildMigrationPluginArgsMatchPluginRootContract(t *testing.T) {
Driver: "golang-migrate",
SourceDir: "migrations",
}, []string{"status"})
want := []string{"--wfctl-cli", "status", "--driver", "golang-migrate", "--source-dir", "migrations"}
want := []string{"status", "--driver", "golang-migrate", "--source-dir", "migrations"}
if !reflect.DeepEqual(got, want) {
t.Fatalf("args = %#v, want %#v", got, want)
}
}

func TestBuildMigrationPluginLintArgsMatchPluginContract(t *testing.T) {
got := buildMigrationPluginLintArgs(migrationPluginRunConfig{SourceDir: "migrations"})
want := []string{"--wfctl-cli", "lint", "migrations"}
want := []string{"lint", "migrations"}
if !reflect.DeepEqual(got, want) {
t.Fatalf("args = %#v, want %#v", got, want)
}
Expand Down Expand Up @@ -122,7 +121,7 @@ func TestDefaultMigrationPluginExecutorDoesNotInheritProcessSecrets(t *testing.T
}
t.Setenv("LEAKED_CI_SECRET", "must-not-reach-plugin")

_, err := defaultMigrationPluginExecutor(context.Background(), "workflow-plugin-migrations", []string{"--wfctl-cli", "migrate", "status"}, map[string]string{
_, err := defaultMigrationPluginExecutor(context.Background(), "workflow-plugin-migrations", []string{"status", "--driver", "golang-migrate", "--source-dir", "migrations"}, map[string]string{
"DATABASE_URL": "postgres://user:secret@example.com/app",
"WFCTL_PLUGIN_DIR": root,
"PLUGIN_PUBLIC_VAR": "ok",
Expand Down
2 changes: 1 addition & 1 deletion cmd/wfctl/migrations_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func stubMigrationStatusRunner(t *testing.T, result migrationCommandResult, runE
if pluginName != "workflow-plugin-migrations" {
t.Fatalf("pluginName = %q", pluginName)
}
if got := strings.Join(args, " "); !strings.Contains(got, "--wfctl-cli status") {
if got := strings.Join(args, " "); !strings.HasPrefix(got, "status ") {
t.Fatalf("args = %q, want status", got)
}
if env["DATABASE_URL"] != "postgres://secret@example/db" {
Expand Down
120 changes: 120 additions & 0 deletions cmd/wfctl/migrations_up_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package main

import (
"context"
"encoding/json"
"reflect"
"strings"
"testing"
)

func TestRunMigrationsUpAppliesAndVerifiesCleanStatus(t *testing.T) {
cfgPath := writeMigrationStatusConfig(t)
t.Setenv("DATABASE_URL", "postgres://secret@example/db")
var calls []string
restore := stubMigrationUpRunner(t, &calls, []migrationCommandResult{
{},
{Stdout: "Current: 20260426000002\nDirty: false\nNo pending migrations.\n"},
})
defer restore()

if err := runMigrations([]string{"up", "--config", cfgPath, "--env", "prod"}); err != nil {
t.Fatal(err)
}

want := []string{"up", "status"}
if !reflect.DeepEqual(calls, want) {
t.Fatalf("calls = %#v, want %#v", calls, want)
}
}

func TestRunMigrationsUpFailsWhenPostStatusIsDirty(t *testing.T) {
cfgPath := writeMigrationStatusConfig(t)
t.Setenv("DATABASE_URL", "postgres://secret@example/db")
var calls []string
restore := stubMigrationUpRunner(t, &calls, []migrationCommandResult{
{},
{Stdout: "Current: 20260426000002\nDirty: true\nNo pending migrations.\n"},
})
defer restore()

err := runMigrations([]string{"up", "--config", cfgPath, "--env", "prod"})
if err == nil {
t.Fatal("runMigrations() error = nil; want dirty post-up failure")
}
if !strings.Contains(err.Error(), "dirty after up") {
t.Fatalf("runMigrations() error = %v; want dirty after up", err)
}
}

func TestRunMigrationsUpFailsWhenPostStatusHasPendingMigrations(t *testing.T) {
cfgPath := writeMigrationStatusConfig(t)
t.Setenv("DATABASE_URL", "postgres://secret@example/db")
var calls []string
restore := stubMigrationUpRunner(t, &calls, []migrationCommandResult{
{},
{Stdout: "Current: 20260426000002\nDirty: false\nPending: 20260426000003\n"},
})
defer restore()

err := runMigrations([]string{"up", "--config", cfgPath, "--env", "prod"})
if err == nil {
t.Fatal("runMigrations() error = nil; want pending migration failure")
}
if !strings.Contains(err.Error(), "pending migrations") {
t.Fatalf("runMigrations() error = %v; want pending migrations", err)
}
}

func TestRunMigrationsUpJSONOutputRedactsDSN(t *testing.T) {
cfgPath := writeMigrationStatusConfig(t)
t.Setenv("DATABASE_URL", "postgres://secret@example/db")
var calls []string
restore := stubMigrationUpRunner(t, &calls, []migrationCommandResult{
{},
{Stdout: "Current: 20260426000002\nDirty: false\nNo pending migrations.\n"},
})
defer restore()

out, err := captureStdout(t, func() error {
return runMigrations([]string{"up", "--config", cfgPath, "--env", "prod", "--format", "json"})
})
if err != nil {
t.Fatal(err)
}
if strings.Contains(out, "postgres://secret@example/db") {
t.Fatalf("output leaked DSN: %s", out)
}
var got migrationStatusResult
if err := json.Unmarshal([]byte(out), &got); err != nil {
t.Fatalf("decode JSON: %v\n%s", err, out)
}
if got.Decision != "pass" || len(got.Migrations) != 1 || got.Migrations[0].Current != "20260426000002" {
t.Fatalf("unexpected result: %+v", got)
}
if len(got.Migrations[0].Pending) != 0 {
t.Fatalf("pending = %#v", got.Migrations[0].Pending)
}
}

func stubMigrationUpRunner(t *testing.T, calls *[]string, results []migrationCommandResult) func() {
t.Helper()
oldFactory := newMigrationPluginRunner
newMigrationPluginRunner = func() migrationPluginRunner {
return migrationPluginRunner{
exec: func(_ context.Context, _ string, args []string, env map[string]string) (migrationCommandResult, error) {
if env["DATABASE_URL"] != "postgres://secret@example/db" {
t.Fatalf("DATABASE_URL = %q", env["DATABASE_URL"])
}
*calls = append(*calls, migrationCommandFromArgs(args))
if len(results) == 0 {
return migrationCommandResult{}, nil
}
result := results[0]
results = results[1:]
return result, nil
},
}
}
return func() { newMigrationPluginRunner = oldFactory }
}
8 changes: 4 additions & 4 deletions cmd/wfctl/migrations_validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestRunMigrationsValidateRunsLintAndFreshCycle(t *testing.T) {
if strings.Contains(strings.Join(args, " "), " test ") && env["DATABASE_URL"] != "postgres://ephemeral/app-fresh" {
t.Fatalf("fresh cycle used DATABASE_URL = %q", env["DATABASE_URL"])
}
if strings.Contains(strings.Join(args, " "), " lint ") && env["DATABASE_URL"] != "" {
if len(args) > 0 && args[0] == "lint" && env["DATABASE_URL"] != "" {
t.Fatalf("lint received DATABASE_URL = %q", env["DATABASE_URL"])
}
return migrationCommandResult{Stdout: `{"current":"202604270001","dirty":false,"pending":[]}`}, nil
Expand All @@ -49,9 +49,9 @@ func TestRunMigrationsValidateRunsLintAndFreshCycle(t *testing.T) {
}

want := []string{
"workflow-plugin-migrations --wfctl-cli lint migrations ",
"workflow-plugin-migrations lint migrations ",
"ephemeral app-fresh postgres://secret@example/db",
"workflow-plugin-migrations --wfctl-cli test --driver golang-migrate --source-dir migrations postgres://ephemeral/app-fresh",
"workflow-plugin-migrations test --driver golang-migrate --source-dir migrations postgres://ephemeral/app-fresh",
"cleanup ephemeral app-fresh",
}
if !reflect.DeepEqual(calls, want) {
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestRunMigrationsValidateJSONOutputOnFailure(t *testing.T) {
newMigrationPluginRunner = func() migrationPluginRunner {
return migrationPluginRunner{
exec: func(_ context.Context, _ string, args []string, _ map[string]string) (migrationCommandResult, error) {
if strings.Contains(strings.Join(args, " "), " lint ") {
if len(args) > 0 && args[0] == "lint" {
return migrationCommandResult{}, errors.New("lint failed for postgres://secret@example/db")
}
return migrationCommandResult{}, nil
Expand Down
Loading