From 19d2600fa9078df8b4105c8d6a410c9993f15742 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Mon, 18 May 2026 17:54:03 -0400 Subject: [PATCH 1/2] fix: tighten wfctl validation --- cmd/wfctl/infra.go | 30 ++++- cmd/wfctl/infra_test.go | 15 +++ cmd/wfctl/main_test.go | 72 ++++++++++-- cmd/wfctl/plugin_install_lockfile_test.go | 81 +++++++++++++ cmd/wfctl/plugin_install_wfctllock.go | 85 +++++++++++++- cmd/wfctl/validate.go | 46 +++++++- docs/WFCTL.md | 6 +- schema/step_inference.go | 136 +++++++++++++++++++++- schema/step_schema_builtins.go | 11 +- schema/step_schema_test.go | 35 ++++++ validation/pipeline_refs.go | 58 ++++++++- validation/pipeline_refs_test.go | 107 +++++++++++++++++ 12 files changed, 652 insertions(+), 30 deletions(-) diff --git a/cmd/wfctl/infra.go b/cmd/wfctl/infra.go index 4b703570..c1582772 100644 --- a/cmd/wfctl/infra.go +++ b/cmd/wfctl/infra.go @@ -356,7 +356,7 @@ func runInfraPlan(args []string) error { fmt.Println() fmt.Println("Pending JIT resolution (apply-time):") for _, d := range resolutionDiags { - fmt.Printf(" %s: ${%s}\n", d.ResourceName, d.Ref) + fmt.Printf(" %s: %s\n", d.ResourceName, formatResolutionDiagnosticRef(d.Ref)) } } @@ -688,6 +688,34 @@ func formatPlanMarkdown(plan interfaces.IaCPlan, showSensitive bool) string { return sb.String() } +func formatResolutionDiagnosticRef(ref string) string { + if isSensitiveResolutionRef(ref) { + return "" + } + return "${" + ref + "}" +} + +func isSensitiveResolutionRef(ref string) bool { + ref = strings.ToLower(ref) + if ref == "" { + return false + } + parts := strings.Split(ref, ".") + last := parts[len(parts)-1] + for _, sensitiveKey := range secrets.DefaultSensitiveKeys() { + k := strings.ToLower(sensitiveKey) + if ref == k || last == k { + return true + } + } + for _, token := range []string{"secret", "token", "password", "passwd", "pwd", "private", "credential", "dsn", "uri", "url"} { + if strings.Contains(ref, token) { + return true + } + } + return false +} + // resourceSummaryKeys returns the most relevant key-value pairs to display for // a given resource type. Each entry is a [key, value] pair. Sensitive keys are // masked as "(sensitive)" unless showSensitive is true. diff --git a/cmd/wfctl/infra_test.go b/cmd/wfctl/infra_test.go index 22ba3fa3..381fb5e2 100644 --- a/cmd/wfctl/infra_test.go +++ b/cmd/wfctl/infra_test.go @@ -311,6 +311,21 @@ func TestFormatPlanTable_MasksSensitiveInDefaultMode(t *testing.T) { } } +func TestFormatResolutionDiagnosticRefRedactsSensitiveRefs(t *testing.T) { + for _, ref := range []string{"JWT_SECRET", "STRIPE_SECRET_KEY", "DATABASE_URL", "bmw-database.uri"} { + got := formatResolutionDiagnosticRef(ref) + if strings.Contains(got, ref) || strings.Contains(got, "${") { + t.Fatalf("formatResolutionDiagnosticRef(%q) = %q, want redacted", ref, got) + } + } + for _, ref := range []string{"IMAGE_SHA", "bmw-database.id"} { + got := formatResolutionDiagnosticRef(ref) + if got != "${"+ref+"}" { + t.Fatalf("formatResolutionDiagnosticRef(%q) = %q, want literal ref", ref, got) + } + } +} + // --- helpers --- func writeTempYAML(t *testing.T, content string) (string, error) { diff --git a/cmd/wfctl/main_test.go b/cmd/wfctl/main_test.go index 43c92e12..3ea0483b 100644 --- a/cmd/wfctl/main_test.go +++ b/cmd/wfctl/main_test.go @@ -144,13 +144,65 @@ func TestRunValidateInvalid(t *testing.T) { } } -func TestRunValidateStrict(t *testing.T) { +func TestRunValidateStrictByDefault(t *testing.T) { dir := t.TempDir() emptyConfig := "modules: []\n" path := writeTestConfig(t, dir, "empty.yaml", emptyConfig) - err := runValidate([]string{"-strict", path}) + err := runValidate([]string{path}) if err == nil { - t.Fatal("expected error in strict mode with empty modules") + t.Fatal("expected error by default with empty modules") + } +} + +func TestRunValidateLooseAllowsEmptyModules(t *testing.T) { + dir := t.TempDir() + emptyConfig := "modules: []\n" + path := writeTestConfig(t, dir, "empty.yaml", emptyConfig) + if err := runValidate([]string{"--loose", path}); err != nil { + t.Fatalf("expected --loose to allow empty modules, got: %v", err) + } + if err := runValidate([]string{"--non-strict", path}); err != nil { + t.Fatalf("expected --non-strict to allow empty modules, got: %v", err) + } +} + +func TestRunValidateCatchesDBQueryCachedRowWrapperByDefault(t *testing.T) { + dir := t.TempDir() + cfg := ` +modules: + - name: router + type: http.router +pipelines: + payment-create-intent: + trigger: + type: http + config: + path: /api/v1/payments/intents + method: POST + steps: + - name: check_mock_mode + type: step.db_query_cached + config: + database: db + query: "SELECT COALESCE((SELECT settings->>'mock_payments' FROM tenants WHERE id = $1), 'false') AS mock_payments" + mode: single + cache_key: tenant:test:mock_payments + - name: set_mock_flag + type: step.set + config: + values: + is_mock: '{{ index .steps "check_mock_mode" "row" "mock_payments" | default "false" }}' +` + path := writeTestConfig(t, dir, "payment.yaml", cfg) + err := runValidate([]string{path}) + if err == nil { + t.Fatal("expected validate to fail on stale db_query_cached row wrapper") + } + if !strings.Contains(err.Error(), "pipeline-refs warning") || !strings.Contains(err.Error(), "check_mock_mode.row") { + t.Fatalf("validate error should mention pipeline refs and check_mock_mode.row, got: %v", err) + } + if err := runValidate([]string{"--loose", path}); err != nil { + t.Fatalf("--loose should allow transitional pipeline reference warnings, got: %v", err) } } @@ -252,12 +304,12 @@ modules: ` path := writeTestConfig(t, dir, "custom.yaml", unknownTypeConfig) // Should fail without the flag - err := runValidate([]string{path}) + err := runValidate([]string{"--allow-no-entry-points", path}) if err == nil { t.Fatal("expected error for unknown type") } // Should pass with the flag - if err := runValidate([]string{"--skip-unknown-types", path}); err != nil { + if err := runValidate([]string{"--skip-unknown-types", "--allow-no-entry-points", path}); err != nil { t.Fatalf("expected pass with --skip-unknown-types, got: %v", err) } } @@ -382,12 +434,12 @@ modules: path := writeTestConfig(t, dir, "workflow.yaml", configContent) // Without --plugin-dir: should fail (unknown type) - if err := runValidate([]string{path}); err == nil { + if err := runValidate([]string{"--allow-no-entry-points", path}); err == nil { t.Fatal("expected error for unknown external module type without --plugin-dir") } // With --plugin-dir: should pass - if err := runValidate([]string{"--plugin-dir", pluginsDir, path}); err != nil { + if err := runValidate([]string{"--plugin-dir", pluginsDir, "--allow-no-entry-points", path}); err != nil { t.Errorf("expected valid config with --plugin-dir, got: %v", err) } t.Cleanup(func() { @@ -414,12 +466,12 @@ func TestRunValidatePluginDirCapabilities(t *testing.T) { path := writeTestConfig(t, dir, "workflow.yaml", configContent) // Without --plugin-dir: should fail (unknown type) - if err := runValidate([]string{path}); err == nil { + if err := runValidate([]string{"--allow-no-entry-points", path}); err == nil { t.Fatal("expected error for unknown external module type without --plugin-dir") } // With --plugin-dir: should pass (types from capabilities object are recognized) - if err := runValidate([]string{"--plugin-dir", pluginsDir, path}); err != nil { + if err := runValidate([]string{"--plugin-dir", pluginsDir, "--allow-no-entry-points", path}); err != nil { t.Errorf("expected valid config with --plugin-dir (capabilities format), got: %v", err) } t.Cleanup(func() { @@ -462,7 +514,7 @@ modules: - name: ext-mod type: custom.step_schema_validate_testonly `) - if err := runValidate([]string{"--plugin-dir", pluginsDir, path}); err != nil { + if err := runValidate([]string{"--plugin-dir", pluginsDir, "--allow-no-entry-points", path}); err != nil { t.Fatalf("expected valid config with --plugin-dir, got: %v", err) } if got := reg.Get("step.schema_validate_testonly"); got == nil { diff --git a/cmd/wfctl/plugin_install_lockfile_test.go b/cmd/wfctl/plugin_install_lockfile_test.go index 5020fad4..f649ff48 100644 --- a/cmd/wfctl/plugin_install_lockfile_test.go +++ b/cmd/wfctl/plugin_install_lockfile_test.go @@ -145,6 +145,87 @@ plugins: } } +func TestInstallFromWfctlLockfile_UsesCachedInstallWhenLockMetadataMatches(t *testing.T) { + dir := t.TempDir() + lockPath := filepath.Join(dir, ".wfctl-lock.yaml") + pluginDir := filepath.Join(dir, "plugins") + if err := os.MkdirAll(pluginDir, 0o755); err != nil { + t.Fatal(err) + } + + origWD, err := os.Getwd() + if err != nil { + t.Fatalf("getwd: %v", err) + } + if err := os.Chdir(dir); err != nil { + t.Fatalf("chdir: %v", err) + } + t.Cleanup(func() { os.Chdir(origWD) }) //nolint:errcheck + + var downloadHits atomic.Int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + downloadHits.Add(1) + http.Error(w, "cache should satisfy lockfile install", http.StatusInternalServerError) + })) + defer srv.Close() + + const pluginName = "auth" + installDir := filepath.Join(pluginDir, pluginName) + if err := os.MkdirAll(installDir, 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(installDir, pluginName), []byte("#!/bin/sh\necho cached auth\n"), 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(installDir, "plugin.json"), minimalPluginJSON(pluginName, "v1.2.3"), 0o644); err != nil { + t.Fatal(err) + } + + plat := config.WfctlLockPlatform{ + URL: srv.URL + "/workflow-plugin-auth-" + currentPlatformKey() + ".tar.gz", + SHA256: strings.Repeat("a", 64), + } + entry := config.WfctlLockPluginEntry{ + Version: "v1.2.3", + Source: "github.com/GoCodeAlone/workflow-plugin-auth", + Platforms: map[string]config.WfctlLockPlatform{ + currentPlatformKey(): plat, + }, + } + meta := lockfileInstallMetadata{ + Version: entry.Version, + Source: entry.Source, + Platform: currentPlatformKey(), + URL: plat.URL, + SHA256: plat.SHA256, + } + metaData, err := json.MarshalIndent(meta, "", " ") + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(installDir, lockfileInstallMetadataName), append(metaData, '\n'), 0o600); err != nil { + t.Fatal(err) + } + + lf := &config.WfctlLockfile{ + Version: 1, + GeneratedAt: time.Now(), + Plugins: map[string]config.WfctlLockPluginEntry{ + "workflow-plugin-auth": entry, + }, + } + if err := config.SaveWfctlLockfile(lockPath, lf); err != nil { + t.Fatal(err) + } + + if err := installFromWfctlLockfile(pluginDir, lockPath, lf); err != nil { + t.Fatalf("installFromWfctlLockfile should reuse cached plugin matching lock metadata: %v", err) + } + if got := downloadHits.Load(); got != 0 { + t.Fatalf("download endpoint was hit %d times; cached install should satisfy lockfile", got) + } +} + func TestInstallFromWfctlLockfile_ScrubsExplicitEmptyTopLevelSHA256(t *testing.T) { dir := t.TempDir() lockPath := filepath.Join(dir, ".wfctl-lock.yaml") diff --git a/cmd/wfctl/plugin_install_wfctllock.go b/cmd/wfctl/plugin_install_wfctllock.go index 9dd74633..1b8e67c8 100644 --- a/cmd/wfctl/plugin_install_wfctllock.go +++ b/cmd/wfctl/plugin_install_wfctllock.go @@ -1,8 +1,10 @@ package main import ( + "encoding/json" "fmt" "os" + "path/filepath" "runtime" "sort" "strings" @@ -46,7 +48,6 @@ func installFromWfctlLockfile(pluginDirVal, lockPath string, lf *config.WfctlLoc var failed []string for _, name := range names { entry := lf.Plugins[name] - fmt.Fprintf(os.Stderr, "Installing %s@%s...\n", name, entry.Version) installed := false @@ -66,11 +67,32 @@ func installFromWfctlLockfile(pluginDirVal, lockPath string, lf *config.WfctlLoc failed = append(failed, errMsg) continue } + if cached, err := installedPluginSatisfiesLock(pluginDirVal, name, entry, platKey, plat); err != nil { + fmt.Fprintf(os.Stderr, "warning: cached install for %s is not reusable: %v\n", name, err) + } else if cached { + fmt.Fprintf(os.Stderr, "Using cached %s@%s from %s\n", name, entry.Version, filepath.Join(pluginDirVal, normalizePluginName(name))) + installed = true + } + } + + if installed { + continue + } + + fmt.Fprintf(os.Stderr, "Installing %s@%s...\n", name, entry.Version) + + if len(entry.Platforms) > 0 { + plat := entry.Platforms[platKey] if err := installFromURL(plat.URL, pluginDirVal, plat.SHA256, false); err != nil { fmt.Fprintf(os.Stderr, "error installing %s from URL: %v\n", name, err) failed = append(failed, fmt.Sprintf("%s (%v)", name, err)) continue } + if err := writeLockfileInstallMetadata(pluginDirVal, name, entry, platKey, plat); err != nil { + fmt.Fprintf(os.Stderr, "error recording install metadata for %s: %v\n", name, err) + failed = append(failed, fmt.Sprintf("%s (%v)", name, err)) + continue + } installed = true } @@ -120,3 +142,64 @@ func scrubbedWfctlLockfileTopLevelSHA256(lf *config.WfctlLockfile) *config.Wfctl func currentPlatformKey() string { return fmt.Sprintf("%s-%s", runtime.GOOS, runtime.GOARCH) } + +const lockfileInstallMetadataName = ".wfctl-install.json" + +type lockfileInstallMetadata struct { + Version string `json:"version"` + Source string `json:"source,omitempty"` + Platform string `json:"platform"` + URL string `json:"url"` + SHA256 string `json:"sha256"` +} + +func installedPluginSatisfiesLock(pluginDir, lockName string, entry config.WfctlLockPluginEntry, platform string, plat config.WfctlLockPlatform) (bool, error) { + installName := normalizePluginName(lockName) + installDir := filepath.Join(pluginDir, installName) + if _, err := os.Stat(installDir); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, fmt.Errorf("stat %s: %w", installDir, err) + } + if err := verifyInstalledPlugin(installDir, installName); err != nil { + return false, err + } + if err := verifyInstalledVersion(installDir, entry.Version); err != nil { + return false, err + } + + data, err := os.ReadFile(filepath.Join(installDir, lockfileInstallMetadataName)) + if os.IsNotExist(err) { + return false, nil + } + if err != nil { + return false, fmt.Errorf("read install metadata: %w", err) + } + var meta lockfileInstallMetadata + if err := json.Unmarshal(data, &meta); err != nil { + return false, fmt.Errorf("parse install metadata: %w", err) + } + if !samePluginVersion(meta.Version, entry.Version) { + return false, nil + } + if meta.Source != entry.Source || meta.Platform != platform || meta.URL != plat.URL || !strings.EqualFold(meta.SHA256, plat.SHA256) { + return false, nil + } + return true, nil +} + +func writeLockfileInstallMetadata(pluginDir, lockName string, entry config.WfctlLockPluginEntry, platform string, plat config.WfctlLockPlatform) error { + installDir := filepath.Join(pluginDir, normalizePluginName(lockName)) + meta := lockfileInstallMetadata{ + Version: entry.Version, + Source: entry.Source, + Platform: platform, + URL: plat.URL, + SHA256: strings.ToLower(plat.SHA256), + } + data, err := json.MarshalIndent(meta, "", " ") + if err != nil { + return err + } + return os.WriteFile(filepath.Join(installDir, lockfileInstallMetadataName), append(data, '\n'), 0o600) +} diff --git a/cmd/wfctl/validate.go b/cmd/wfctl/validate.go index f883700d..0a2ff0c5 100644 --- a/cmd/wfctl/validate.go +++ b/cmd/wfctl/validate.go @@ -12,12 +12,15 @@ import ( "github.com/GoCodeAlone/workflow/internal/legacyaws" "github.com/GoCodeAlone/workflow/internal/legacydo" "github.com/GoCodeAlone/workflow/schema" + "github.com/GoCodeAlone/workflow/validation" "gopkg.in/yaml.v3" ) func runValidate(args []string) error { fs := flag.NewFlagSet("validate", flag.ContinueOnError) - strict := fs.Bool("strict", false, "Enable strict validation (no empty modules allowed)") + strict := fs.Bool("strict", true, "Enable strict validation (default; retained for compatibility)") + loose := fs.Bool("loose", false, "Allow legacy loose validation for transitional configs (planned for removal in v1.0)") + nonStrict := fs.Bool("non-strict", false, "Alias for --loose") skipUnknownTypes := fs.Bool("skip-unknown-types", false, "Skip unknown module/workflow/trigger type checks") allowNoEntryPoints := fs.Bool("allow-no-entry-points", false, "Allow configs with no entry points (triggers, routes, subscriptions, jobs)") dir := fs.String("dir", "", "Validate all .yaml/.yml files in a directory (recursive)") @@ -31,7 +34,7 @@ Examples: wfctl validate config.yaml wfctl validate example/*.yaml wfctl validate --dir ./example/ - wfctl validate --strict admin/config.yaml + wfctl validate --loose legacy/config.yaml wfctl validate --skip-unknown-types example/*.yaml wfctl validate --plugin-dir data/plugins config.yaml @@ -47,6 +50,9 @@ Options: if err := fs.Parse(args); err != nil { return err } + if *loose || *nonStrict { + *strict = false + } // Load external plugin types before validation so their module/trigger/workflow // types are recognised and don't cause false "unknown type" errors. @@ -234,11 +240,47 @@ func validateFile(cfgPath string, strict, skipUnknownTypes, allowNoEntryPoints b fmt.Fprintf(os.Stderr, " WARN %s: %s\n", cfgPath, warn) } + if cfg.Pipelines != nil { + if refs := validation.ValidatePipelineTemplateRefs(cfg.Pipelines); refs.HasIssues() { + var findings, blocking []string + for _, w := range refs.Warnings { + msg := "pipeline-refs warning: " + w + findings = append(findings, msg) + if isBlockingPipelineRefWarning(w) { + blocking = append(blocking, msg) + } + if !strict || !isBlockingPipelineRefWarning(w) { + fmt.Fprintf(os.Stderr, " WARN %s: %s\n", cfgPath, msg) + } + } + for _, e := range refs.Errors { + findings = append(findings, "pipeline-refs error: "+e) + } + if len(refs.Errors) > 0 { + return fmt.Errorf("%s", strings.Join(findings, "\n")) + } + if strict && len(blocking) > 0 { + return fmt.Errorf("%s", strings.Join(blocking, "\n")) + } + } + } + fmt.Printf(" PASS %s (%d modules, %d workflows, %d triggers)\n", cfgPath, len(cfg.Modules), len(cfg.Workflows), len(cfg.Triggers)) return nil } +func isBlockingPipelineRefWarning(warning string) bool { + if strings.Contains(warning, "hyphenated dot-access") { + return false + } + return strings.Contains(warning, "does not exist in this pipeline") || + strings.Contains(warning, "has not executed yet") || + strings.Contains(warning, "references itself") || + strings.Contains(warning, "not a known output") || + strings.Contains(warning, "declares outputs") +} + // skipDirs are directory names that should be excluded from recursive scanning. var skipDirs = map[string]bool{ ".playwright-cli": true, diff --git a/docs/WFCTL.md b/docs/WFCTL.md index a72bcf5f..bcaac80d 100644 --- a/docs/WFCTL.md +++ b/docs/WFCTL.md @@ -421,7 +421,9 @@ wfctl validate [options] [config2.yaml ...] | Flag | Default | Description | |------|---------|-------------| -| `-strict` | `false` | Enable strict validation (no empty modules allowed) | +| `-strict` | `true` | Enable strict validation (default; retained for compatibility) | +| `-loose` | `false` | Allow legacy loose validation for transitional configs (planned for removal in v1.0) | +| `-non-strict` | `false` | Alias for `--loose` | | `-skip-unknown-types` | `false` | Skip unknown module/workflow/trigger type checks | | `-allow-no-entry-points` | `false` | Allow configs with no triggers, routes, subscriptions, or jobs | | `-dir` | _(none)_ | Validate all `.yaml`/`.yml` files in a directory (recursive) | @@ -433,7 +435,7 @@ wfctl validate [options] [config2.yaml ...] wfctl validate config.yaml wfctl validate example/*.yaml wfctl validate --dir ./example/ -wfctl validate --strict admin/config.yaml +wfctl validate --loose legacy/config.yaml wfctl validate --skip-unknown-types example/*.yaml wfctl validate --plugin-dir data/plugins config.yaml ``` diff --git a/schema/step_inference.go b/schema/step_inference.go index b4c420ea..3c94714a 100644 --- a/schema/step_inference.go +++ b/schema/step_inference.go @@ -1,6 +1,9 @@ package schema -import "sort" +import ( + "sort" + "strings" +) // InferredOutput describes a single inferred output key for a step instance. type InferredOutput struct { @@ -18,6 +21,8 @@ func (r *StepSchemaRegistry) InferStepOutputs(stepType string, stepConfig map[st return inferSetOutputs(stepConfig) case "step.db_query": return inferDBQueryOutputs(stepConfig) + case "step.db_exec": + return inferDBExecOutputs(stepConfig) case "step.db_query_cached": return inferDBQueryCachedOutputs(stepConfig) case "step.request_parse": @@ -69,15 +74,136 @@ func inferDBQueryOutputs(cfg map[string]any) []InferredOutput { } } +func inferDBExecOutputs(cfg map[string]any) []InferredOutput { + returning, _ := cfg["returning"].(bool) + if returning { + return inferDBQueryOutputs(cfg) + } + return []InferredOutput{ + {Key: "affected_rows", Type: "number", Description: "Number of rows affected by the statement"}, + {Key: "ignored_error", Type: "string", Description: "Error text when ignore_error is enabled and execution fails"}, + {Key: "last_id", Type: "string", Description: "Last inserted row ID as a string"}, + } +} + func inferDBQueryCachedOutputs(cfg map[string]any) []InferredOutput { - base := inferDBQueryOutputs(cfg) - out := make([]InferredOutput, 0, len(base)+1) - out = append(out, InferredOutput{Key: "cache_hit", Type: "boolean", Description: "Whether the result came from cache"}) - out = append(out, base...) + mode, _ := cfg["mode"].(string) + out := []InferredOutput{{Key: "cache_hit", Type: "boolean", Description: "Whether the result came from cache"}} + if mode == "list" { + out = append(out, + InferredOutput{Key: "count", Type: "number", Description: "Number of rows returned"}, + InferredOutput{Key: "rows", Type: "array", Description: "All result rows"}, + ) + sort.Slice(out, func(i, j int) bool { return out[i].Key < out[j].Key }) + return out + } + + query, _ := cfg["query"].(string) + cols := extractSQLColumnsForOutputs(query) + if len(cols) == 0 { + out = append(out, InferredOutput{Key: "(query-column)", Type: "any", Description: "Selected column emitted as a top-level field"}) + sort.Slice(out, func(i, j int) bool { return out[i].Key < out[j].Key }) + return out + } + for _, col := range cols { + out = append(out, InferredOutput{Key: col, Type: "any", Description: "Selected column emitted as a top-level field"}) + } sort.Slice(out, func(i, j int) bool { return out[i].Key < out[j].Key }) return out } +func extractSQLColumnsForOutputs(query string) []string { + query = strings.Join(strings.Fields(query), " ") + upper := strings.ToUpper(query) + selectIdx := strings.Index(upper, "SELECT ") + if selectIdx < 0 { + return nil + } + + selectStart := selectIdx + len("SELECT ") + fromIdx := topLevelSQLKeywordIndex(query, selectStart, " FROM ") + selectClause := query[selectStart:] + if fromIdx > selectStart { + selectClause = query[selectStart:fromIdx] + } + if strings.HasPrefix(strings.ToUpper(strings.TrimSpace(selectClause)), "DISTINCT ") { + selectClause = strings.TrimSpace(selectClause)[9:] + } + + var columns []string + depth := 0 + current := "" + for _, ch := range selectClause { + switch ch { + case '(': + depth++ + current += string(ch) + case ')': + depth-- + current += string(ch) + case ',': + if depth == 0 { + if col := extractSQLColumnNameForOutputs(strings.TrimSpace(current)); col != "" { + columns = append(columns, col) + } + current = "" + } else { + current += string(ch) + } + default: + current += string(ch) + } + } + if col := extractSQLColumnNameForOutputs(strings.TrimSpace(current)); col != "" { + columns = append(columns, col) + } + return columns +} + +func topLevelSQLKeywordIndex(query string, start int, keyword string) int { + upper := strings.ToUpper(query) + depth := 0 + var quote byte + for i := start; i <= len(query)-len(keyword); i++ { + ch := query[i] + if quote != 0 { + if ch == quote { + quote = 0 + } + continue + } + switch ch { + case '\'', '"': + quote = ch + case '(': + depth++ + case ')': + if depth > 0 { + depth-- + } + default: + if depth == 0 && strings.HasPrefix(upper[i:], keyword) { + return i + } + } + } + return -1 +} + +func extractSQLColumnNameForOutputs(expr string) string { + if expr == "" || expr == "*" { + return "" + } + upper := strings.ToUpper(expr) + if asIdx := strings.LastIndex(upper, " AS "); asIdx >= 0 { + return strings.Trim(strings.TrimSpace(expr[asIdx+4:]), "\"'`") + } + if dotIdx := strings.LastIndex(expr, "."); dotIdx >= 0 { + return strings.Trim(strings.TrimSpace(expr[dotIdx+1:]), "\"'`") + } + return strings.Trim(strings.TrimSpace(expr), "\"'`") +} + func inferRequestParseOutputs(_ map[string]any) []InferredOutput { return []InferredOutput{ {Key: "body", Type: "any", Description: "Parsed request body"}, diff --git a/schema/step_schema_builtins.go b/schema/step_schema_builtins.go index d4a1a39a..1ed295b5 100644 --- a/schema/step_schema_builtins.go +++ b/schema/step_schema_builtins.go @@ -63,8 +63,9 @@ func (r *StepSchemaRegistry) registerBuiltins() { {Key: "default", Type: FieldTypeString, Description: "Default step name when no route matches", Required: true}, }, Outputs: []StepOutputDef{ - {Key: "matched_route", Type: "string", Description: "The route value that was matched"}, + {Key: "matched_value", Type: "string", Description: "The field value that was matched"}, {Key: "next_step", Type: "string", Description: "Name of the next step to execute"}, + {Key: "used_default", Type: "boolean", Description: "Whether the default route was used"}, }, }) @@ -135,6 +136,7 @@ func (r *StepSchemaRegistry) registerBuiltins() { {Key: "mode", Type: FieldTypeSelect, Description: "Result mode", Options: []string{"single", "list"}, DefaultValue: "list"}, }, Outputs: []StepOutputDef{ + {Key: "found", Type: "boolean", Description: "Whether a row was found (single mode)"}, {Key: "row", Type: "map", Description: "First result row as key-value map (single mode)"}, {Key: "rows", Type: "[]map", Description: "All result rows (list mode)"}, {Key: "count", Type: "number", Description: "Number of rows returned (list mode)"}, @@ -151,8 +153,9 @@ func (r *StepSchemaRegistry) registerBuiltins() { {Key: "params", Type: FieldTypeArray, Description: "Statement parameters (positional $1, $2...)"}, }, Outputs: []StepOutputDef{ - {Key: "rows_affected", Type: "number", Description: "Number of rows affected by the statement"}, - {Key: "last_insert_id", Type: "number", Description: "Last inserted row ID (if supported by driver)"}, + {Key: "affected_rows", Type: "number", Description: "Number of rows affected by the statement"}, + {Key: "last_id", Type: "string", Description: "Last inserted row ID (if supported by driver)"}, + {Key: "ignored_error", Type: "string", Description: "Error text when ignore_error is enabled and execution fails"}, }, }) @@ -169,7 +172,7 @@ func (r *StepSchemaRegistry) registerBuiltins() { {Key: "mode", Type: FieldTypeSelect, Description: "Result mode", Options: []string{"single", "list"}, DefaultValue: "single"}, }, Outputs: []StepOutputDef{ - {Key: "row", Type: "map", Description: "First result row (single mode, not nested under 'row')"}, + {Key: "(query-column)", Type: "any", Description: "Selected columns are emitted as top-level fields in single mode"}, {Key: "rows", Type: "[]map", Description: "All result rows (list mode)"}, {Key: "count", Type: "number", Description: "Number of rows (list mode)"}, {Key: "cache_hit", Type: "boolean", Description: "Whether the result came from cache"}, diff --git a/schema/step_schema_test.go b/schema/step_schema_test.go index 4e650f41..05ed50df 100644 --- a/schema/step_schema_test.go +++ b/schema/step_schema_test.go @@ -215,6 +215,23 @@ func TestInferStepOutputs_DBQuery_List(t *testing.T) { } } +func TestInferStepOutputs_DBExec(t *testing.T) { + reg := NewStepSchemaRegistry() + outputs := reg.InferStepOutputs("step.db_exec", map[string]any{}) + keys := map[string]bool{} + for _, o := range outputs { + keys[o.Key] = true + } + for _, want := range []string{"affected_rows", "last_id", "ignored_error"} { + if !keys[want] { + t.Errorf("expected %s in db_exec outputs, got %v", want, outputs) + } + } + if keys["rows_affected"] || keys["last_insert_id"] { + t.Errorf("db_exec runtime emits affected_rows/last_id, not rows_affected/last_insert_id: %v", outputs) + } +} + func TestInferStepOutputs_DBQueryCached(t *testing.T) { reg := NewStepSchemaRegistry() outputs := reg.InferStepOutputs("step.db_query_cached", map[string]any{ @@ -229,6 +246,24 @@ func TestInferStepOutputs_DBQueryCached(t *testing.T) { } } +func TestInferStepOutputs_DBQueryCached_SingleModeUsesFlatColumns(t *testing.T) { + reg := NewStepSchemaRegistry() + outputs := reg.InferStepOutputs("step.db_query_cached", map[string]any{ + "mode": "single", + "query": "SELECT COALESCE((SELECT settings->>'mock_payments' FROM tenants WHERE id = $1), 'false') AS mock_payments", + }) + keys := map[string]bool{} + for _, o := range outputs { + keys[o.Key] = true + } + if !keys["mock_payments"] || !keys["cache_hit"] { + t.Errorf("expected mock_payments and cache_hit, got %v", outputs) + } + if keys["row"] || keys["found"] { + t.Errorf("db_query_cached single-mode runtime emits flat columns, not row/found: %v", outputs) + } +} + func TestInferStepOutputs_RequestParse(t *testing.T) { reg := NewStepSchemaRegistry() outputs := reg.InferStepOutputs("step.request_parse", map[string]any{ diff --git a/validation/pipeline_refs.go b/validation/pipeline_refs.go index b8932853..cda13a34 100644 --- a/validation/pipeline_refs.go +++ b/validation/pipeline_refs.go @@ -37,6 +37,9 @@ var stepFieldDotRe = regexp.MustCompile(`\.steps\.([a-zA-Z_][a-zA-Z0-9_-]*)\.([a // stepRefIndexRe matches index .steps "STEP_NAME" patterns. var stepRefIndexRe = regexp.MustCompile(`index\s+\.steps\s+"([^"]+)"`) +// stepIndexFieldRe matches index .steps "STEP_NAME" "FIELD_NAME" patterns. +var stepIndexFieldRe = regexp.MustCompile(`index\s+\.steps\s+"([^"]+)"\s+"([^"]+)"`) + // stepRefFuncRe matches step "STEP_NAME" function calls at the start of an // action, after a pipe, or after an opening parenthesis. var stepRefFuncRe = regexp.MustCompile(`(?:^|\||\()\s*step\s+"([^"]+)"`) @@ -194,7 +197,15 @@ func validatePipelineTemplateRefs(pipelineName string, stepsRaw []any, reg *sche } } - // Check for step name references via index (no field path resolvable) + // Check for step output field references via index + // (index .steps "STEP_NAME" "FIELD_NAME" ...) + indexFieldMatches := stepIndexFieldRe.FindAllStringSubmatch(actionContent, -1) + for _, m := range indexFieldMatches { + refStepName, refField := m[1], m[2] + validateStepOutputField(pipelineName, stepName, refStepName, refField, stepMeta, reg, result) + } + + // Check for step name references via index. indexMatches := stepRefIndexRe.FindAllStringSubmatch(actionContent, -1) for _, m := range indexMatches { refName := m[1] @@ -390,7 +401,10 @@ func collectTemplateStrings(data any) []string { results = append(results, v) } case map[string]any: - for _, val := range v { + for key, val := range v { + if key == "step" || key == "steps" { + continue + } results = append(results, collectTemplateStrings(val)...) } case []any: @@ -441,12 +455,16 @@ func ExtractSQLColumns(query string) []string { // Find SELECT ... FROM upper := strings.ToUpper(query) selectIdx := strings.Index(upper, "SELECT ") - fromIdx := strings.Index(upper, " FROM ") - if selectIdx < 0 || fromIdx < 0 || fromIdx <= selectIdx { + if selectIdx < 0 { return nil } - selectClause := query[selectIdx+7 : fromIdx] + selectStart := selectIdx + len("SELECT ") + fromIdx := topLevelSQLKeywordIndex(query, selectStart, " FROM ") + selectClause := query[selectStart:] + if fromIdx > selectStart { + selectClause = query[selectStart:fromIdx] + } // Handle DISTINCT if strings.HasPrefix(strings.ToUpper(strings.TrimSpace(selectClause)), "DISTINCT ") { @@ -484,6 +502,36 @@ func ExtractSQLColumns(query string) []string { return columns } +func topLevelSQLKeywordIndex(query string, start int, keyword string) int { + upper := strings.ToUpper(query) + depth := 0 + var quote byte + for i := start; i <= len(query)-len(keyword); i++ { + ch := query[i] + if quote != 0 { + if ch == quote { + quote = 0 + } + continue + } + switch ch { + case '\'', '"': + quote = ch + case '(': + depth++ + case ')': + if depth > 0 { + depth-- + } + default: + if depth == 0 && strings.HasPrefix(upper[i:], keyword) { + return i + } + } + } + return -1 +} + // extractColumnName extracts the effective column name from a SELECT expression. // Handles: "col", "table.col", "expr AS alias", "COALESCE(...) AS alias". func extractColumnName(expr string) string { diff --git a/validation/pipeline_refs_test.go b/validation/pipeline_refs_test.go index 4a1cf107..b89db16e 100644 --- a/validation/pipeline_refs_test.go +++ b/validation/pipeline_refs_test.go @@ -160,6 +160,113 @@ func TestValidatePipelineTemplateRefs_KnownOutputField(t *testing.T) { } } +func TestValidatePipelineTemplateRefs_DBQueryCachedSingleRejectsRowWrapper(t *testing.T) { + pipelines := map[string]any{ + "payment-create-intent": map[string]any{ + "steps": []any{ + map[string]any{ + "name": "check_mock_mode", + "type": "step.db_query_cached", + "config": map[string]any{ + "mode": "single", + "query": "SELECT COALESCE((SELECT settings->>'mock_payments' FROM tenants WHERE id = $1), 'false') AS mock_payments", + }, + }, + map[string]any{ + "name": "set_mock_flag", + "type": "step.set", + "config": map[string]any{ + "values": map[string]any{ + "is_mock": `{{ index .steps "check_mock_mode" "row" "mock_payments" | default "false" }}`, + }, + }, + }, + }, + }, + } + result := validation.ValidatePipelineTemplateRefs(pipelines) + found := false + for _, w := range result.Warnings { + if strings.Contains(w, `references check_mock_mode.row`) && strings.Contains(w, "mock_payments") { + found = true + break + } + } + if !found { + t.Fatalf("expected warning for stale db_query_cached row wrapper, got %v", result.Warnings) + } +} + +func TestValidatePipelineTemplateRefs_DBQueryCachedSingleAcceptsFlatColumn(t *testing.T) { + pipelines := map[string]any{ + "payment-create-intent": map[string]any{ + "steps": []any{ + map[string]any{ + "name": "check_mock_mode", + "type": "step.db_query_cached", + "config": map[string]any{ + "mode": "single", + "query": "SELECT COALESCE((SELECT settings->>'mock_payments' FROM tenants WHERE id = $1), 'false') AS mock_payments", + }, + }, + map[string]any{ + "name": "set_mock_flag", + "type": "step.set", + "config": map[string]any{ + "values": map[string]any{ + "is_mock": `{{ step "check_mock_mode" "mock_payments" | default "false" }}`, + }, + }, + }, + }, + }, + } + result := validation.ValidatePipelineTemplateRefs(pipelines) + for _, w := range result.Warnings { + if strings.Contains(w, "check_mock_mode") { + t.Fatalf("unexpected warning for flat db_query_cached field: %v", result.Warnings) + } + } +} + +func TestValidatePipelineTemplateRefs_DoesNotTreatNestedLoopStepsAsTopLevel(t *testing.T) { + pipelines := map[string]any{ + "price-sync-nightly": map[string]any{ + "steps": []any{ + map[string]any{ + "name": "process_tracked_items", + "type": "step.foreach", + "config": map[string]any{ + "collection": "steps.items.rows", + "steps": []any{ + map[string]any{ + "name": "check_price", + "type": "step.set", + "config": map[string]any{ + "values": map[string]any{"provider_price": "100"}, + }, + }, + map[string]any{ + "name": "record_price", + "type": "step.db_exec", + "config": map[string]any{ + "params": []any{`{{ index .steps "check_price" "provider_price" }}`}, + }, + }, + }, + }, + }, + }, + }, + } + result := validation.ValidatePipelineTemplateRefs(pipelines) + for _, w := range result.Warnings { + if strings.Contains(w, "check_price") && strings.Contains(w, "does not exist") { + t.Fatalf("nested foreach step should not be validated as a top-level step ref: %v", result.Warnings) + } + } +} + // TestValidatePipelineTemplateRefs_SelfReference checks that a step referencing // its own output produces a warning. func TestValidatePipelineTemplateRefs_SelfReference(t *testing.T) { From 0cbfcba5535b867178dd8d4d92b7273632ffccc1 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Mon, 18 May 2026 18:21:12 -0400 Subject: [PATCH 2/2] fix: use structured pipeline ref warnings --- cmd/wfctl/main_test.go | 57 ++++++++++++++++++++++++++ cmd/wfctl/validate.go | 24 ++++------- validation/pipeline_refs.go | 79 +++++++++++++++++++++++++++++++------ 3 files changed, 131 insertions(+), 29 deletions(-) diff --git a/cmd/wfctl/main_test.go b/cmd/wfctl/main_test.go index 3ea0483b..44be4d52 100644 --- a/cmd/wfctl/main_test.go +++ b/cmd/wfctl/main_test.go @@ -525,3 +525,60 @@ modules: reg.Unregister("step.schema_validate_testonly") }) } + +func TestRunValidatePluginDirUsesStepSchemasForPipelineRefs(t *testing.T) { + pluginsDir := t.TempDir() + pluginSubdir := filepath.Join(pluginsDir, "my-ext-plugin-output-schema") + if err := os.MkdirAll(pluginSubdir, 0o755); err != nil { + t.Fatal(err) + } + manifest := `{ + "name": "my-ext-plugin-output-schema", + "version": "1.0.0", + "stepTypes": ["step.output_schema_validate_testonly"], + "stepSchemas": [ + { + "type": "step.output_schema_validate_testonly", + "description": "test-only plugin step output schema", + "outputs": [ + {"key": "known_output", "type": "string"} + ] + } + ] + }` + if err := os.WriteFile(filepath.Join(pluginSubdir, "plugin.json"), []byte(manifest), 0o644); err != nil { + t.Fatal(err) + } + reg := schema.GetStepSchemaRegistry() + t.Cleanup(func() { + schema.UnregisterModuleType("step.output_schema_validate_testonly") + reg.Unregister("step.output_schema_validate_testonly") + }) + + dir := t.TempDir() + path := writeTestConfig(t, dir, "workflow.yaml", ` +modules: + - name: server + type: http.server + config: + address: ":8080" +pipelines: + test: + steps: + - name: plugin-step + type: step.output_schema_validate_testonly + - name: consume + type: step.set + config: + values: + result: '{{ step "plugin-step" "missing_output" }}' +`) + + err := runValidate([]string{"--plugin-dir", pluginsDir, "--allow-no-entry-points", path}) + if err == nil { + t.Fatal("expected strict validation to reject plugin step output field not declared by plugin schema") + } + if !strings.Contains(err.Error(), "missing_output") { + t.Fatalf("expected error to mention missing plugin output field, got: %v", err) + } +} diff --git a/cmd/wfctl/validate.go b/cmd/wfctl/validate.go index 0a2ff0c5..ba46c3f1 100644 --- a/cmd/wfctl/validate.go +++ b/cmd/wfctl/validate.go @@ -241,15 +241,13 @@ func validateFile(cfgPath string, strict, skipUnknownTypes, allowNoEntryPoints b } if cfg.Pipelines != nil { - if refs := validation.ValidatePipelineTemplateRefs(cfg.Pipelines); refs.HasIssues() { - var findings, blocking []string + if refs := validation.ValidatePipelineTemplateRefs(cfg.Pipelines, schema.GetStepSchemaRegistry()); refs.HasIssues() { + var findings []string + blocking := refs.BlockingWarningMessages() for _, w := range refs.Warnings { msg := "pipeline-refs warning: " + w findings = append(findings, msg) - if isBlockingPipelineRefWarning(w) { - blocking = append(blocking, msg) - } - if !strict || !isBlockingPipelineRefWarning(w) { + if !strict || !containsString(blocking, w) { fmt.Fprintf(os.Stderr, " WARN %s: %s\n", cfgPath, msg) } } @@ -260,6 +258,9 @@ func validateFile(cfgPath string, strict, skipUnknownTypes, allowNoEntryPoints b return fmt.Errorf("%s", strings.Join(findings, "\n")) } if strict && len(blocking) > 0 { + for i, w := range blocking { + blocking[i] = "pipeline-refs warning: " + w + } return fmt.Errorf("%s", strings.Join(blocking, "\n")) } } @@ -270,17 +271,6 @@ func validateFile(cfgPath string, strict, skipUnknownTypes, allowNoEntryPoints b return nil } -func isBlockingPipelineRefWarning(warning string) bool { - if strings.Contains(warning, "hyphenated dot-access") { - return false - } - return strings.Contains(warning, "does not exist in this pipeline") || - strings.Contains(warning, "has not executed yet") || - strings.Contains(warning, "references itself") || - strings.Contains(warning, "not a known output") || - strings.Contains(warning, "declares outputs") -} - // skipDirs are directory names that should be excluded from recursive scanning. var skipDirs = map[string]bool{ ".playwright-cli": true, diff --git a/validation/pipeline_refs.go b/validation/pipeline_refs.go index cda13a34..5090ac3a 100644 --- a/validation/pipeline_refs.go +++ b/validation/pipeline_refs.go @@ -11,11 +11,26 @@ import ( "github.com/GoCodeAlone/workflow/schema" ) +// RefWarningCode classifies a pipeline reference warning so callers can make +// policy decisions without parsing human-readable warning text. +type RefWarningCode string + +const ( + RefWarningHyphenatedDotAccess RefWarningCode = "hyphenated_dot_access" + RefWarningUnknownOutput RefWarningCode = "unknown_output" + RefWarningMissingStep RefWarningCode = "missing_step" + RefWarningSelfReference RefWarningCode = "self_reference" + RefWarningForwardReference RefWarningCode = "forward_reference" + RefWarningSQLColumnMismatch RefWarningCode = "sql_column_mismatch" +) + // RefValidationResult holds the outcome of pipeline template reference validation. -// Warnings are suspicious but non-fatal references; Errors are definitively wrong. +// Warnings are suspicious but non-fatal references; WarningCodes classifies each +// warning at the same index. Errors are definitively wrong. type RefValidationResult struct { - Warnings []string - Errors []string + Warnings []string + WarningCodes []RefWarningCode + Errors []string } // HasIssues returns true when there are any warnings or errors. @@ -23,6 +38,45 @@ func (r *RefValidationResult) HasIssues() bool { return len(r.Warnings) > 0 || len(r.Errors) > 0 } +// AddWarning records a warning and its stable machine-readable code. +func (r *RefValidationResult) AddWarning(code RefWarningCode, message string) { + r.Warnings = append(r.Warnings, message) + r.WarningCodes = append(r.WarningCodes, code) +} + +// BlockingWarningMessages returns warning messages that represent deterministic +// runtime failures and should fail strict validation. +func (r *RefValidationResult) BlockingWarningMessages() []string { + blocking := make([]string, 0, len(r.Warnings)) + for i, warning := range r.Warnings { + code := RefWarningCode("") + if i < len(r.WarningCodes) { + code = r.WarningCodes[i] + } + if IsBlockingRefWarningCode(code) { + blocking = append(blocking, warning) + } + } + return blocking +} + +// IsBlockingRefWarningCode reports whether a warning code should fail strict +// validation. Hyphenated dot-access is non-blocking because the runtime rewrites +// it, but missing/forward/self refs and invalid output/SQL fields are runtime +// failures. +func IsBlockingRefWarningCode(code RefWarningCode) bool { + switch code { + case RefWarningMissingStep, + RefWarningSelfReference, + RefWarningForwardReference, + RefWarningUnknownOutput, + RefWarningSQLColumnMismatch: + return true + default: + return false + } +} + // templateExprRe matches template actions {{ ... }}. var templateExprRe = regexp.MustCompile(`\{\{(.*?)\}\}`) @@ -228,7 +282,7 @@ func validatePipelineTemplateRefs(pipelineName string, stepsRaw []any, reg *sche // Warn on hyphenated dot-access (auto-fixed but suggest preferred syntax) if hyphenDotRe.MatchString(actionContent) { - result.Warnings = append(result.Warnings, + result.AddWarning(RefWarningHyphenatedDotAccess, fmt.Sprintf("pipeline %q step %q: template uses hyphenated dot-access which is auto-fixed; prefer step \"name\" \"field\" syntax", pipelineName, stepName)) } } @@ -275,7 +329,7 @@ func validateStepOutputField(pipelineName, currentStep, refStepName, refField st for _, o := range outputs { keys = append(keys, o.Key) } - result.Warnings = append(result.Warnings, + result.AddWarning(RefWarningUnknownOutput, fmt.Sprintf("pipeline %q step %q: references %s.%s but step %q (%s) declares outputs: %s", pipelineName, currentStep, refStepName, refField, refStepName, meta.typ, strings.Join(keys, ", "))) } @@ -288,15 +342,15 @@ func validateStepRef(pipelineName, currentStep, refName, fieldPath string, curre refIdx, exists := stepNames[refName] switch { case !exists: - result.Warnings = append(result.Warnings, + result.AddWarning(RefWarningMissingStep, fmt.Sprintf("pipeline %q step %q: references step %q which does not exist in this pipeline", pipelineName, currentStep, refName)) return case refIdx == currentIdx: - result.Warnings = append(result.Warnings, + result.AddWarning(RefWarningSelfReference, fmt.Sprintf("pipeline %q step %q: references itself; a step cannot use its own outputs because they are not available until after execution", pipelineName, currentStep)) return case refIdx > currentIdx: - result.Warnings = append(result.Warnings, + result.AddWarning(RefWarningForwardReference, fmt.Sprintf("pipeline %q step %q: references step %q which has not executed yet (appears later in pipeline)", pipelineName, currentStep, refName)) return } @@ -339,7 +393,7 @@ func validateStepRef(pipelineName, currentStep, refName, fieldPath string, curre } } if matchedOutput == nil { - result.Warnings = append(result.Warnings, + result.AddWarning(RefWarningUnknownOutput, fmt.Sprintf("pipeline %q step %q: references step %q output field %q which is not a known output of step type %q (known outputs: %s)", pipelineName, currentStep, refName, firstField, info.stepType, joinOutputKeys(outputs))) return @@ -360,7 +414,7 @@ func validateStepRef(pipelineName, currentStep, refName, fieldPath string, curre } } if !found { - result.Warnings = append(result.Warnings, + result.AddWarning(RefWarningSQLColumnMismatch, fmt.Sprintf("pipeline %q step %q: references step %q output field \"row.%s\" but the SQL query does not select column %q (available: %s)", pipelineName, currentStep, refName, columnName, columnName, strings.Join(sqlCols, ", "))) } @@ -391,8 +445,9 @@ func validatePlainStepRefs(pipelineName, stepName string, stepIdx int, stepCfg m } // collectTemplateStrings recursively finds all strings containing {{ in a value tree. -// This intentionally scans all fields (not just "config") because template expressions -// can appear in conditions, names, and other step fields. +// It scans all fields except nested "step"/"steps" definitions. Nested step lists +// need their own ordering context; validating them as top-level pipeline steps +// produces false missing-step findings. func collectTemplateStrings(data any) []string { var results []string switch v := data.(type) {