Skip to content

Commit 8a82340

Browse files
authored
feat: add wfctl migrations up (#755)
1 parent 940e5de commit 8a82340

8 files changed

Lines changed: 243 additions & 19 deletions

cmd/wfctl/dsl-reference-embedded.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,6 +1322,7 @@ Manage SQLite-backed engine config schema migrations.
13221322
- `wfctl config migrate apply [--db workflow.db]` — apply pending migrations
13231323
- `wfctl config migrate plugins [--config workflow.yaml]` — migrate requires.plugins[] entries
13241324
- `wfctl config migrate repair-dirty [options]` — repair dirty golang-migrate metadata
1325+
- `wfctl migrations up --config app.yaml --env prod` — apply application database migrations through the configured migration plugin and verify post-up status is clean
13251326

13261327
> **Deprecated:** `wfctl migrate` is an alias for `wfctl config migrate` and will be removed in v0.21+.
13271328
> Update scripts to use `wfctl config migrate`.

cmd/wfctl/migrations.go

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,15 @@ type migrationRepairDirtyResult struct {
5555

5656
func runMigrations(args []string) error {
5757
if len(args) == 0 {
58-
return fmt.Errorf("usage: wfctl migrations <validate|status|ci-check|repair-dirty>")
58+
return fmt.Errorf("usage: wfctl migrations <validate|status|up|ci-check|repair-dirty>")
5959
}
6060
switch args[0] {
6161
case "validate":
6262
return runMigrationsValidate(args[1:])
6363
case "status":
6464
return runMigrationsStatus(args[1:])
65+
case "up":
66+
return runMigrationsUp(args[1:])
6567
case "ci-check":
6668
return runMigrationsCICheck(args[1:])
6769
case "repair-dirty":
@@ -107,6 +109,38 @@ func runMigrationsStatus(args []string) error {
107109
return nil
108110
}
109111

112+
func runMigrationsUp(args []string) error {
113+
fs := flag.NewFlagSet("migrations up", flag.ContinueOnError)
114+
configFile := fs.String("config", "app.yaml", "Workflow config file")
115+
fs.StringVar(configFile, "c", "app.yaml", "Config file (short for --config)")
116+
envName := fs.String("env", "", "Environment name")
117+
pluginDir := fs.String("plugin-dir", defaultMigrationPluginDir(), "Plugin directory")
118+
format := fs.String("format", "text", "Output format: text or json")
119+
if err := fs.Parse(args); err != nil {
120+
return err
121+
}
122+
123+
cfg, err := loadMigrationWorkflowConfig(*configFile)
124+
if err != nil {
125+
return fmt.Errorf("load config: %w", err)
126+
}
127+
migrations, err := resolveMigrationConfigs(cfg, *envName)
128+
if err != nil {
129+
return err
130+
}
131+
result, err := applyMigrationsForConfigs(context.Background(), migrations, *pluginDir)
132+
if writeErr := writeMigrationStatusOutput(result, *format, "migrations up"); writeErr != nil {
133+
return writeErr
134+
}
135+
if err != nil {
136+
return err
137+
}
138+
if result.Decision == "fail" {
139+
return errors.New(strings.Join(result.Reasons, "; "))
140+
}
141+
return nil
142+
}
143+
110144
func runMigrationsCICheck(args []string) error {
111145
fs := flag.NewFlagSet("migrations ci-check", flag.ContinueOnError)
112146
configFile := fs.String("config", "app.yaml", "Workflow config file")
@@ -567,6 +601,66 @@ func collectMigrationStatusForConfigs(ctx context.Context, migrations []resolved
567601
return result, errors.Join(errs...)
568602
}
569603

604+
func applyMigrationsForConfigs(ctx context.Context, migrations []resolvedMigrationConfig, pluginDir string) (migrationStatusResult, error) {
605+
result := migrationStatusResult{
606+
Decision: "pass",
607+
Destructive: false,
608+
HumanApprovalRequired: false,
609+
Migrations: make([]migrationValidationRecord, 0, len(migrations)),
610+
}
611+
runner := newMigrationPluginRunner()
612+
var errs []error
613+
for _, migration := range migrations {
614+
runCfg := migrationPluginRunConfig{
615+
Plugin: migration.Plugin,
616+
PluginDir: pluginDir,
617+
Driver: migration.Driver,
618+
SourceDir: migration.SourceDir,
619+
DSN: migration.DSN,
620+
}
621+
record := migrationValidationRecord{Name: migration.Name, Driver: migration.Driver}
622+
if _, err := runner.run(ctx, runCfg, "up"); err != nil {
623+
reason := fmt.Sprintf("migration %s up failed: %s", migration.Name, redactMigrationDSN(err.Error(), migration.DSN))
624+
result.Reasons = append(result.Reasons, reason)
625+
record.Error = reason
626+
result.Migrations = append(result.Migrations, record)
627+
errs = append(errs, errors.New(reason))
628+
continue
629+
}
630+
statusOutput, err := runner.run(ctx, runCfg, "status")
631+
if err != nil {
632+
reason := fmt.Sprintf("migration %s post-up status failed: %s", migration.Name, redactMigrationDSN(err.Error(), migration.DSN))
633+
result.Reasons = append(result.Reasons, reason)
634+
record.Error = reason
635+
result.Migrations = append(result.Migrations, record)
636+
errs = append(errs, errors.New(reason))
637+
continue
638+
}
639+
status, err := parseMigrationStatus(statusOutput.Stdout)
640+
if err != nil {
641+
reason := fmt.Sprintf("migration %s post-up status failed: %s", migration.Name, redactMigrationDSN(err.Error(), migration.DSN))
642+
result.Reasons = append(result.Reasons, reason)
643+
record.Error = reason
644+
result.Migrations = append(result.Migrations, record)
645+
errs = append(errs, errors.New(reason))
646+
continue
647+
}
648+
record.Current = status.Current
649+
record.Dirty = status.Dirty
650+
record.Pending = status.Pending
651+
for _, reason := range migrationStatusCleanReasons([]migrationValidationRecord{record}) {
652+
reason = strings.Replace(reason, " is dirty ", " is dirty after up ", 1)
653+
result.Reasons = append(result.Reasons, reason)
654+
errs = append(errs, errors.New(reason))
655+
}
656+
result.Migrations = append(result.Migrations, record)
657+
}
658+
if len(result.Reasons) > 0 {
659+
result.Decision = "fail"
660+
}
661+
return result, errors.Join(errs...)
662+
}
663+
570664
func migrationCurrentOrUnknown(current string) string {
571665
current = strings.TrimSpace(current)
572666
if current == "" {

cmd/wfctl/migrations_baseline_test.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -651,14 +651,21 @@ func stubMigrationBaselineHooks(t *testing.T, calls *[]string, changedFiles []st
651651
}
652652

653653
func migrationCommandFromArgs(args []string) string {
654-
if len(args) < 2 {
654+
if len(args) == 0 {
655655
return ""
656656
}
657-
if args[1] == "lint" {
657+
offset := 0
658+
if args[0] == "--wfctl-cli" {
659+
offset = 1
660+
}
661+
if len(args) <= offset {
662+
return ""
663+
}
664+
if args[offset] == "lint" {
658665
return "lint"
659666
}
660-
command := []string{args[1]}
661-
for i := 2; i < len(args); i++ {
667+
command := []string{args[offset]}
668+
for i := offset + 1; i < len(args); i++ {
662669
if args[i] == "--driver" || args[i] == "--source-dir" || args[i] == "--dsn" {
663670
break
664671
}
@@ -668,8 +675,12 @@ func migrationCommandFromArgs(args []string) string {
668675
}
669676

670677
func migrationSourceFromArgs(args []string) string {
671-
if len(args) >= 3 && args[1] == "lint" {
672-
return args[2]
678+
offset := 0
679+
if len(args) > 0 && args[0] == "--wfctl-cli" {
680+
offset = 1
681+
}
682+
if len(args) > offset+1 && args[offset] == "lint" {
683+
return args[offset+1]
673684
}
674685
return argValue(args, "--source-dir")
675686
}

cmd/wfctl/migrations_plugin_runner.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ func (r migrationPluginRunner) runPluginArgs(ctx context.Context, cfg migrationP
6161
}
6262

6363
func buildMigrationPluginArgs(cfg migrationPluginRunConfig, commandArgs []string) []string {
64-
args := []string{"--wfctl-cli"}
65-
args = append(args, commandArgs...)
64+
args := append([]string(nil), commandArgs...)
6665
args = append(args,
6766
"--driver", cfg.Driver,
6867
"--source-dir", cfg.SourceDir,
@@ -71,7 +70,7 @@ func buildMigrationPluginArgs(cfg migrationPluginRunConfig, commandArgs []string
7170
}
7271

7372
func buildMigrationPluginLintArgs(cfg migrationPluginRunConfig) []string {
74-
return []string{"--wfctl-cli", "lint", cfg.SourceDir}
73+
return []string{"lint", cfg.SourceDir}
7574
}
7675

7776
func buildMigrationPluginEnv(cfg migrationPluginRunConfig) map[string]string {

cmd/wfctl/migrations_plugin_runner_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ func TestMigrationPluginRunnerBuildsWorkflowMigrateArgs(t *testing.T) {
4141
t.Fatalf("plugin = %q", gotPlugin)
4242
}
4343
wantArgs := []string{
44-
"--wfctl-cli",
4544
"test",
4645
"--driver",
4746
"golang-migrate",
@@ -61,15 +60,15 @@ func TestBuildMigrationPluginArgsMatchPluginRootContract(t *testing.T) {
6160
Driver: "golang-migrate",
6261
SourceDir: "migrations",
6362
}, []string{"status"})
64-
want := []string{"--wfctl-cli", "status", "--driver", "golang-migrate", "--source-dir", "migrations"}
63+
want := []string{"status", "--driver", "golang-migrate", "--source-dir", "migrations"}
6564
if !reflect.DeepEqual(got, want) {
6665
t.Fatalf("args = %#v, want %#v", got, want)
6766
}
6867
}
6968

7069
func TestBuildMigrationPluginLintArgsMatchPluginContract(t *testing.T) {
7170
got := buildMigrationPluginLintArgs(migrationPluginRunConfig{SourceDir: "migrations"})
72-
want := []string{"--wfctl-cli", "lint", "migrations"}
71+
want := []string{"lint", "migrations"}
7372
if !reflect.DeepEqual(got, want) {
7473
t.Fatalf("args = %#v, want %#v", got, want)
7574
}
@@ -122,7 +121,7 @@ func TestDefaultMigrationPluginExecutorDoesNotInheritProcessSecrets(t *testing.T
122121
}
123122
t.Setenv("LEAKED_CI_SECRET", "must-not-reach-plugin")
124123

125-
_, err := defaultMigrationPluginExecutor(context.Background(), "workflow-plugin-migrations", []string{"--wfctl-cli", "migrate", "status"}, map[string]string{
124+
_, err := defaultMigrationPluginExecutor(context.Background(), "workflow-plugin-migrations", []string{"status", "--driver", "golang-migrate", "--source-dir", "migrations"}, map[string]string{
126125
"DATABASE_URL": "postgres://user:secret@example.com/app",
127126
"WFCTL_PLUGIN_DIR": root,
128127
"PLUGIN_PUBLIC_VAR": "ok",

cmd/wfctl/migrations_status_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func stubMigrationStatusRunner(t *testing.T, result migrationCommandResult, runE
186186
if pluginName != "workflow-plugin-migrations" {
187187
t.Fatalf("pluginName = %q", pluginName)
188188
}
189-
if got := strings.Join(args, " "); !strings.Contains(got, "--wfctl-cli status") {
189+
if got := strings.Join(args, " "); !strings.HasPrefix(got, "status ") {
190190
t.Fatalf("args = %q, want status", got)
191191
}
192192
if env["DATABASE_URL"] != "postgres://secret@example/db" {

cmd/wfctl/migrations_up_test.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"reflect"
7+
"strings"
8+
"testing"
9+
)
10+
11+
func TestRunMigrationsUpAppliesAndVerifiesCleanStatus(t *testing.T) {
12+
cfgPath := writeMigrationStatusConfig(t)
13+
t.Setenv("DATABASE_URL", "postgres://secret@example/db")
14+
var calls []string
15+
restore := stubMigrationUpRunner(t, &calls, []migrationCommandResult{
16+
{},
17+
{Stdout: "Current: 20260426000002\nDirty: false\nNo pending migrations.\n"},
18+
})
19+
defer restore()
20+
21+
if err := runMigrations([]string{"up", "--config", cfgPath, "--env", "prod"}); err != nil {
22+
t.Fatal(err)
23+
}
24+
25+
want := []string{"up", "status"}
26+
if !reflect.DeepEqual(calls, want) {
27+
t.Fatalf("calls = %#v, want %#v", calls, want)
28+
}
29+
}
30+
31+
func TestRunMigrationsUpFailsWhenPostStatusIsDirty(t *testing.T) {
32+
cfgPath := writeMigrationStatusConfig(t)
33+
t.Setenv("DATABASE_URL", "postgres://secret@example/db")
34+
var calls []string
35+
restore := stubMigrationUpRunner(t, &calls, []migrationCommandResult{
36+
{},
37+
{Stdout: "Current: 20260426000002\nDirty: true\nNo pending migrations.\n"},
38+
})
39+
defer restore()
40+
41+
err := runMigrations([]string{"up", "--config", cfgPath, "--env", "prod"})
42+
if err == nil {
43+
t.Fatal("runMigrations() error = nil; want dirty post-up failure")
44+
}
45+
if !strings.Contains(err.Error(), "dirty after up") {
46+
t.Fatalf("runMigrations() error = %v; want dirty after up", err)
47+
}
48+
}
49+
50+
func TestRunMigrationsUpFailsWhenPostStatusHasPendingMigrations(t *testing.T) {
51+
cfgPath := writeMigrationStatusConfig(t)
52+
t.Setenv("DATABASE_URL", "postgres://secret@example/db")
53+
var calls []string
54+
restore := stubMigrationUpRunner(t, &calls, []migrationCommandResult{
55+
{},
56+
{Stdout: "Current: 20260426000002\nDirty: false\nPending: 20260426000003\n"},
57+
})
58+
defer restore()
59+
60+
err := runMigrations([]string{"up", "--config", cfgPath, "--env", "prod"})
61+
if err == nil {
62+
t.Fatal("runMigrations() error = nil; want pending migration failure")
63+
}
64+
if !strings.Contains(err.Error(), "pending migrations") {
65+
t.Fatalf("runMigrations() error = %v; want pending migrations", err)
66+
}
67+
}
68+
69+
func TestRunMigrationsUpJSONOutputRedactsDSN(t *testing.T) {
70+
cfgPath := writeMigrationStatusConfig(t)
71+
t.Setenv("DATABASE_URL", "postgres://secret@example/db")
72+
var calls []string
73+
restore := stubMigrationUpRunner(t, &calls, []migrationCommandResult{
74+
{},
75+
{Stdout: "Current: 20260426000002\nDirty: false\nNo pending migrations.\n"},
76+
})
77+
defer restore()
78+
79+
out, err := captureStdout(t, func() error {
80+
return runMigrations([]string{"up", "--config", cfgPath, "--env", "prod", "--format", "json"})
81+
})
82+
if err != nil {
83+
t.Fatal(err)
84+
}
85+
if strings.Contains(out, "postgres://secret@example/db") {
86+
t.Fatalf("output leaked DSN: %s", out)
87+
}
88+
var got migrationStatusResult
89+
if err := json.Unmarshal([]byte(out), &got); err != nil {
90+
t.Fatalf("decode JSON: %v\n%s", err, out)
91+
}
92+
if got.Decision != "pass" || len(got.Migrations) != 1 || got.Migrations[0].Current != "20260426000002" {
93+
t.Fatalf("unexpected result: %+v", got)
94+
}
95+
if len(got.Migrations[0].Pending) != 0 {
96+
t.Fatalf("pending = %#v", got.Migrations[0].Pending)
97+
}
98+
}
99+
100+
func stubMigrationUpRunner(t *testing.T, calls *[]string, results []migrationCommandResult) func() {
101+
t.Helper()
102+
oldFactory := newMigrationPluginRunner
103+
newMigrationPluginRunner = func() migrationPluginRunner {
104+
return migrationPluginRunner{
105+
exec: func(_ context.Context, _ string, args []string, env map[string]string) (migrationCommandResult, error) {
106+
if env["DATABASE_URL"] != "postgres://secret@example/db" {
107+
t.Fatalf("DATABASE_URL = %q", env["DATABASE_URL"])
108+
}
109+
*calls = append(*calls, migrationCommandFromArgs(args))
110+
if len(results) == 0 {
111+
return migrationCommandResult{}, nil
112+
}
113+
result := results[0]
114+
results = results[1:]
115+
return result, nil
116+
},
117+
}
118+
}
119+
return func() { newMigrationPluginRunner = oldFactory }
120+
}

cmd/wfctl/migrations_validate_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestRunMigrationsValidateRunsLintAndFreshCycle(t *testing.T) {
3434
if strings.Contains(strings.Join(args, " "), " test ") && env["DATABASE_URL"] != "postgres://ephemeral/app-fresh" {
3535
t.Fatalf("fresh cycle used DATABASE_URL = %q", env["DATABASE_URL"])
3636
}
37-
if strings.Contains(strings.Join(args, " "), " lint ") && env["DATABASE_URL"] != "" {
37+
if len(args) > 0 && args[0] == "lint" && env["DATABASE_URL"] != "" {
3838
t.Fatalf("lint received DATABASE_URL = %q", env["DATABASE_URL"])
3939
}
4040
return migrationCommandResult{Stdout: `{"current":"202604270001","dirty":false,"pending":[]}`}, nil
@@ -49,9 +49,9 @@ func TestRunMigrationsValidateRunsLintAndFreshCycle(t *testing.T) {
4949
}
5050

5151
want := []string{
52-
"workflow-plugin-migrations --wfctl-cli lint migrations ",
52+
"workflow-plugin-migrations lint migrations ",
5353
"ephemeral app-fresh postgres://secret@example/db",
54-
"workflow-plugin-migrations --wfctl-cli test --driver golang-migrate --source-dir migrations postgres://ephemeral/app-fresh",
54+
"workflow-plugin-migrations test --driver golang-migrate --source-dir migrations postgres://ephemeral/app-fresh",
5555
"cleanup ephemeral app-fresh",
5656
}
5757
if !reflect.DeepEqual(calls, want) {
@@ -122,7 +122,7 @@ func TestRunMigrationsValidateJSONOutputOnFailure(t *testing.T) {
122122
newMigrationPluginRunner = func() migrationPluginRunner {
123123
return migrationPluginRunner{
124124
exec: func(_ context.Context, _ string, args []string, _ map[string]string) (migrationCommandResult, error) {
125-
if strings.Contains(strings.Join(args, " "), " lint ") {
125+
if len(args) > 0 && args[0] == "lint" {
126126
return migrationCommandResult{}, errors.New("lint failed for postgres://secret@example/db")
127127
}
128128
return migrationCommandResult{}, nil

0 commit comments

Comments
 (0)