Skip to content

Commit 76ec00d

Browse files
intel352claude
andauthored
feat: plugin ecosystem + step.workflow_call (#331)
* feat(wfctl): add plugin install --url for direct URL installs Adds --url flag to wfctl plugin install that downloads a tar.gz archive from a direct URL, extracts plugin.json to identify the plugin name, installs to the plugin directory, and records the SHA-256 checksum in the lockfile. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(wfctl): enhanced plugin init scaffold with full project structure Extends wfctl plugin init to generate cmd/workflow-plugin-<name>/main.go, internal/provider.go, internal/steps.go, go.mod, .goreleaser.yml, CI/release GitHub Actions workflows, Makefile, and README.md. Adds --module flag for custom Go module paths. Preserves existing plugin.json and .go skeleton for backward compatibility. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat: engine auto-fetch for declared external plugins on startup Add AutoFetchPlugin and AutoFetchDeclaredPlugins to plugin/autofetch.go, which shell out to wfctl to download plugins not found locally. Extend WorkflowConfig with a new PluginsConfig / ExternalPluginDecl type so configs can declare plugins with autoFetch: true and an optional version constraint. StdEngine gains SetExternalPluginDir and calls AutoFetchDeclaredPlugins in BuildFromConfig before module loading. The server's buildEngine registers the plugin dir so auto-fetch is active at runtime. If wfctl is absent, a warning is logged and startup continues. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: resolve all CI lint failures - Use struct conversion for staticIndexEntry → PluginSummary (staticcheck S1016) - Remove unused updateLockfile and writePluginJSON functions - Add nilerr annotations for intentional nil returns in integrity.go - Add gosec annotation for exec.Command in autofetch.go - Fix TestLoadRegistryConfigDefault to use DefaultRegistryConfig() directly Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: address Copilot review feedback on engine PR - integrity.go: fail closed when lockfile exists but is unreadable or unparseable, preventing integrity enforcement bypass - autofetch.go: extract stripVersionConstraint helper; detect compound version constraints and fall back to latest; check both pluginName and workflow-plugin-<name> alternate form for installed-check; log restart warning when new plugins are downloaded (they require a server restart) - autofetch_test.go: test stripVersionConstraint directly instead of duplicating the logic inline; add compound-constraint cases - engine.go: clarify comment that auto-fetched plugins need a restart Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: address all Copilot review comments on engine PR (#330) - generator.go: use plugin/external/sdk imports and types (PluginProvider, StepInstance, StepResult, StepTypes/CreateStep) instead of plugin/sdk - PLUGIN_AUTHORING.md: update examples to match external SDK interfaces - plugin_install.go: hash installed binary (not archive) for lockfile, add hashFileSHA256 helper, add install mode mutual exclusivity check, update installFromLocal to write lockfile, normalize plugin names - plugin_lockfile.go: add registry param to updateLockfileWithChecksum, pass version/registry in installFromLockfile, remove dir on mismatch - registry_source.go: validate URL in NewStaticRegistrySource - config.go: clarify Version field forwarding semantics Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: address remaining Copilot review comments on engine PR (#330) - registry_source.go: use explicit field assignment for PluginSummary instead of struct type conversion (clearer, avoids tag confusion) - plugin_lockfile.go: don't pass @Version in installFromLockfile to prevent lockfile overwrite before checksum verification - plugin_install.go: add verifyInstalledPlugin() call in installFromURL for parity with registry installs - engine.go: add TODO to move auto-fetch before plugin discovery so newly fetched plugins are available without restart - integrity_test.go: add tests for unreadable and malformed lockfile to verify fail-closed behavior Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: suppress S1016 lint, add generator project structure tests - registry_source.go: add nolint:gosimple for S1016 — explicit field assignment preferred for clarity across different struct tags - generator_test.go: add TestGenerateProjectStructure verifying all generated files exist and use correct external SDK imports/types Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: move auto-fetch before plugin discovery so fetched plugins load immediately Auto-fetch was running inside BuildFromConfig, which executes after external plugins are already discovered and loaded. Plugins downloaded by auto-fetch required a server restart to take effect. Move auto-fetch to buildEngine in cmd/server/main.go, before DiscoverPlugins/LoadPlugin. Remove the now-unused externalPluginDir field and SetExternalPluginDir from the engine. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: add step.workflow_call for cross-pipeline dispatch Enables pipelines to call other pipelines by name with full context forwarding. Supports template-resolved workflow names and stop_pipeline option. Required for WebSocket message routing to game-specific pipelines. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address all 9 Copilot review comments on PR #331 - registry_source.go: switch GitHubRegistrySource to use registryHTTPClient (timeout-configured) for both ListPlugins and FetchManifest; update comment - autofetch.go: scan for AutoFetch=true entries before exec.LookPath to avoid misleading startup warning when no plugins need auto-fetch - autofetch.go: add internal autoFetchPlugin helper that accepts *slog.Logger and emits structured log entries when a logger is available; public AutoFetchPlugin delegates to it; AutoFetchDeclaredPlugins passes its logger - autofetch_test.go: rename TestAutoFetchPlugin_CorrectArgs to TestAutoFetchPlugin_SkipsWhenExists to match what the test actually asserts - integrity.go: replace os.ReadFile + sha256.Sum256 with streaming os.Open + io.Copy into sha256.New() to keep memory bounded for large binaries - integrity_test.go: add t.Skip guard in UnreadableLockfile test when the file is actually readable (Windows / root environments) - pipeline_step_workflow_call.go: use resolved workflowName (not s.workflow) in async return payload and sync error message for consistency - docs/PLUGIN_AUTHORING.md: clarify the distinction between plugin.json name (short, used by engine) and the registry/provider manifest name (workflow-plugin- prefixed), and which is used for dependency resolution - config/config.go: MergeApplicationConfig now merges Plugins.External from each referenced workflow file into the combined config, deduplicated by name Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: remove duplicate hashFileSHA256 from merge conflict resolution Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address new PR #331 review comments - docs/PLUGIN_AUTHORING.md: close unclosed code fence after StepProvider example - cmd/wfctl/plugin_install.go: streaming hashFileSHA256 via io.Copy + sha256.New() - cmd/wfctl/plugin_install.go: warn on hash failure instead of silent empty checksum - config/merge_test.go: add TestMergeApplicationConfig_PluginDedup covering first-definition-wins dedup Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address 4 new PR #331 review comments - pipeline_step_workflow_call.go: propagate stop_pipeline in async mode - integrity.go: "open plugin binary" instead of "read" for os.Open error - plugin_install.go: lowercase "warning:" for consistency - merge_test.go: use filepath.Join and check writeFileContent errors Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent db6c04d commit 76ec00d

12 files changed

Lines changed: 217 additions & 58 deletions

cmd/wfctl/multi_registry.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ func NewMultiRegistry(cfg *RegistryConfig) *MultiRegistry {
2929
case "github":
3030
sources = append(sources, NewGitHubRegistrySource(sc))
3131
case "static":
32-
staticSrc, staticErr := NewStaticRegistrySource(sc)
33-
if staticErr != nil {
34-
fmt.Fprintf(os.Stderr, "warning: %v, skipping\n", staticErr)
32+
src, err := NewStaticRegistrySource(sc)
33+
if err != nil {
34+
fmt.Fprintf(os.Stderr, "warning: %v, skipping\n", err)
3535
continue
3636
}
37-
sources = append(sources, staticSrc)
37+
sources = append(sources, src)
3838
default:
3939
// Skip unknown types
4040
fmt.Fprintf(os.Stderr, "warning: unknown registry type %q for %q, skipping\n", sc.Type, sc.Name)

cmd/wfctl/plugin_install.go

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,19 @@ func runPluginInstall(args []string) error {
8181
return err
8282
}
8383

84-
// Enforce mutual exclusivity: at most one of --url, --local, or positional args.
85-
exclusiveCount := 0
84+
// Validate mutual exclusivity of install modes.
85+
modes := 0
8686
if *directURL != "" {
87-
exclusiveCount++
87+
modes++
8888
}
8989
if *localPath != "" {
90-
exclusiveCount++
90+
modes++
9191
}
9292
if fs.NArg() > 0 {
93-
exclusiveCount++
93+
modes++
9494
}
95-
if exclusiveCount > 1 {
96-
return fmt.Errorf("--url, --local, and <name> are mutually exclusive; specify only one")
95+
if modes > 1 {
96+
return fmt.Errorf("specify only one of: <name>, --url, or --local")
9797
}
9898

9999
if *directURL != "" {
@@ -165,13 +165,15 @@ func runPluginInstall(args []string) error {
165165

166166
// Update .wfctl.yaml lockfile if name@version was provided.
167167
if _, ver := parseNameVersion(nameArg); ver != "" {
168-
// Hash the installed binary (not the archive) so verifyInstalledChecksum matches.
168+
pluginName = normalizePluginName(pluginName)
169+
binaryChecksum := ""
169170
binaryPath := filepath.Join(pluginDirVal, pluginName, pluginName)
170-
sha, hashErr := hashFileSHA256(binaryPath)
171-
if hashErr != nil {
172-
fmt.Fprintf(os.Stderr, "warning: could not hash installed binary: %v\n", hashErr)
171+
if cs, hashErr := hashFileSHA256(binaryPath); hashErr == nil {
172+
binaryChecksum = cs
173+
} else {
174+
fmt.Fprintf(os.Stderr, "warning: could not hash binary %s: %v (lockfile will have no checksum)\n", binaryPath, hashErr)
173175
}
174-
updateLockfileWithChecksum(pluginName, manifest.Version, manifest.Repository, sourceName, sha)
176+
updateLockfileWithChecksum(pluginName, manifest.Version, manifest.Repository, sourceName, binaryChecksum)
175177
}
176178

177179
return nil
@@ -516,7 +518,7 @@ func installFromURL(url, pluginDir string) error {
516518
}
517519

518520
if err := ensurePluginBinary(destDir, pluginName); err != nil {
519-
return fmt.Errorf("normalize binary name: %w", err)
521+
return fmt.Errorf("could not normalize binary name: %w", err)
520522
}
521523

522524
// Validate the installed plugin (same checks as registry installs).
@@ -536,6 +538,20 @@ func installFromURL(url, pluginDir string) error {
536538
return nil
537539
}
538540

541+
// hashFileSHA256 computes the SHA-256 hex digest of the file at path using streaming I/O.
542+
func hashFileSHA256(path string) (string, error) {
543+
f, err := os.Open(path)
544+
if err != nil {
545+
return "", fmt.Errorf("hash file %s: %w", path, err)
546+
}
547+
defer f.Close()
548+
h := sha256.New()
549+
if _, err := io.Copy(h, f); err != nil {
550+
return "", fmt.Errorf("hash file %s: %w", path, err)
551+
}
552+
return hex.EncodeToString(h.Sum(nil)), nil
553+
}
554+
539555
// verifyInstalledChecksum reads the plugin binary and verifies its SHA-256 checksum.
540556
func verifyInstalledChecksum(pluginDir, pluginName, expectedSHA256 string) error {
541557
binaryPath := filepath.Join(pluginDir, pluginName)
@@ -590,13 +606,11 @@ func installFromLocal(srcDir, pluginDir string) error {
590606
return err
591607
}
592608

593-
// Update lockfile with binary checksum for consistency with other install paths.
594-
installedBinary := filepath.Join(destDir, pluginName)
595-
sha, hashErr := hashFileSHA256(installedBinary)
609+
binaryChecksum, hashErr := hashFileSHA256(filepath.Join(destDir, pluginName))
596610
if hashErr != nil {
597-
fmt.Fprintf(os.Stderr, "warning: could not hash installed binary: %v\n", hashErr)
611+
fmt.Fprintf(os.Stderr, "warning: could not compute binary checksum: %v\n", hashErr)
598612
}
599-
updateLockfileWithChecksum(pluginName, pj.Version, "", "", sha)
613+
updateLockfileWithChecksum(pluginName, pj.Version, "", "", binaryChecksum)
600614

601615
fmt.Printf("Installed %s v%s from %s to %s\n", pluginName, pj.Version, srcDir, destDir)
602616
return nil
@@ -693,16 +707,6 @@ func parseGitHubRepoURL(repoURL string) (owner, repo string, err error) {
693707
return parts[1], repoName, nil
694708
}
695709

696-
// hashFileSHA256 returns the hex-encoded SHA-256 hash of the file at path.
697-
func hashFileSHA256(path string) (string, error) {
698-
data, err := os.ReadFile(path)
699-
if err != nil {
700-
return "", fmt.Errorf("hash file %s: %w", path, err)
701-
}
702-
h := sha256.Sum256(data)
703-
return hex.EncodeToString(h[:]), nil
704-
}
705-
706710
// extractTarGz decompresses and extracts a .tar.gz archive into destDir.
707711
// It guards against path traversal (zip-slip) attacks.
708712
func extractTarGz(data []byte, destDir string) error {

cmd/wfctl/plugin_lockfile.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,10 @@ func installFromLockfile(pluginDir, cfgPath string) error {
9090
if entry.SHA256 != "" {
9191
pluginInstallDir := filepath.Join(pluginDir, name)
9292
if verifyErr := verifyInstalledChecksum(pluginInstallDir, name, entry.SHA256); verifyErr != nil {
93-
fmt.Fprintf(os.Stderr, "CHECKSUM MISMATCH for %s: %v — removing plugin\n", name, verifyErr)
94-
_ = os.RemoveAll(pluginInstallDir)
93+
fmt.Fprintf(os.Stderr, "CHECKSUM MISMATCH for %s: %v\n", name, verifyErr)
94+
if removeErr := os.RemoveAll(pluginInstallDir); removeErr != nil {
95+
fmt.Fprintf(os.Stderr, "warning: could not remove plugin dir: %v\n", removeErr)
96+
}
9597
failed = append(failed, name)
9698
continue
9799
}

cmd/wfctl/registry_source.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (g *GitHubRegistrySource) ListPlugins() ([]string, error) {
6464
req.Header.Set("Authorization", "Bearer "+token)
6565
}
6666

67-
resp, err := http.DefaultClient.Do(req)
67+
resp, err := registryHTTPClient.Do(req)
6868
if err != nil {
6969
return nil, fmt.Errorf("list registry plugins from %s: %w", g.name, err)
7070
}
@@ -90,7 +90,11 @@ func (g *GitHubRegistrySource) FetchManifest(name string) (*RegistryManifest, er
9090
"https://raw.githubusercontent.com/%s/%s/%s/plugins/%s/manifest.json",
9191
g.owner, g.repo, g.branch, name,
9292
)
93-
resp, err := http.Get(url) //nolint:gosec // URL constructed from configured registry
93+
req, err := http.NewRequest(http.MethodGet, url, nil) //nolint:gosec // URL constructed from configured registry
94+
if err != nil {
95+
return nil, fmt.Errorf("build request: %w", err)
96+
}
97+
resp, err := registryHTTPClient.Do(req)
9498
if err != nil {
9599
return nil, fmt.Errorf("fetch manifest for %q from %s: %w", name, g.name, err)
96100
}
@@ -206,8 +210,13 @@ func (s *StaticRegistrySource) SearchPlugins(query string) ([]PluginSearchResult
206210
strings.Contains(strings.ToLower(e.Name), q) ||
207211
strings.Contains(strings.ToLower(e.Description), q) {
208212
results = append(results, PluginSearchResult{
209-
PluginSummary: PluginSummary(e),
210-
Source: s.name,
213+
PluginSummary: PluginSummary{ //nolint:staticcheck // S1016: explicit fields for clarity across struct tag boundaries
214+
Name: e.Name,
215+
Version: e.Version,
216+
Description: e.Description,
217+
Tier: e.Tier,
218+
},
219+
Source: s.name,
211220
})
212221
}
213222
}
@@ -226,7 +235,8 @@ func (s *StaticRegistrySource) ListPlugins() ([]string, error) {
226235
return names, nil
227236
}
228237

229-
// registryHTTPClient is used for all registry HTTP requests with a reasonable timeout.
238+
// registryHTTPClient is used for all registry HTTP requests (both GitHub and static
239+
// sources) with a reasonable timeout to avoid hangs on network issues.
230240
var registryHTTPClient = &http.Client{Timeout: 30 * time.Second}
231241

232242
// fetch performs an HTTP GET with optional auth token.

config/config.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,25 @@ func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error)
419419
}
420420

421421
combined.Modules = append(combined.Modules, wfCfg.Modules...)
422+
423+
// Merge external plugin declarations — deduplicate by name (first definition wins).
424+
if wfCfg.Plugins != nil && len(wfCfg.Plugins.External) > 0 {
425+
if combined.Plugins == nil {
426+
combined.Plugins = &PluginsConfig{}
427+
}
428+
existingPlugins := make(map[string]struct{}, len(combined.Plugins.External))
429+
for _, ep := range combined.Plugins.External {
430+
existingPlugins[ep.Name] = struct{}{}
431+
}
432+
for _, ep := range wfCfg.Plugins.External {
433+
if _, exists := existingPlugins[ep.Name]; exists {
434+
continue
435+
}
436+
combined.Plugins.External = append(combined.Plugins.External, ep)
437+
existingPlugins[ep.Name] = struct{}{}
438+
}
439+
}
440+
422441
for k, v := range wfCfg.Workflows {
423442
if existing, exists := combined.Workflows[k]; exists {
424443
// If the existing value is nil (e.g. `http:` with no body in YAML),

config/merge_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package config
22

33
import (
44
"os"
5+
"path/filepath"
56
"reflect"
67
"strings"
78
"testing"
@@ -533,3 +534,68 @@ func writeFileContent(path, content string) error {
533534
func contains(s, substr string) bool {
534535
return strings.Contains(s, substr)
535536
}
537+
538+
func TestMergeApplicationConfig_PluginDedup(t *testing.T) {
539+
dir := t.TempDir()
540+
541+
// Workflow A declares plugin "foo"
542+
wfA := filepath.Join(dir, "a.yaml")
543+
if err := writeFileContent(wfA, `
544+
plugins:
545+
external:
546+
- name: foo
547+
version: "1.0"
548+
repository: "https://example.com/foo"
549+
modules: []
550+
`); err != nil {
551+
t.Fatalf("write a.yaml: %v", err)
552+
}
553+
554+
// Workflow B declares plugin "foo" (duplicate) and "bar"
555+
wfB := filepath.Join(dir, "b.yaml")
556+
if err := writeFileContent(wfB, `
557+
plugins:
558+
external:
559+
- name: foo
560+
version: "2.0"
561+
repository: "https://example.com/foo-v2"
562+
- name: bar
563+
version: "1.0"
564+
repository: "https://example.com/bar"
565+
modules: []
566+
`); err != nil {
567+
t.Fatalf("write b.yaml: %v", err)
568+
}
569+
570+
appCfg := &ApplicationConfig{
571+
Application: ApplicationInfo{
572+
Workflows: []WorkflowRef{
573+
{File: wfA},
574+
{File: wfB},
575+
},
576+
},
577+
}
578+
579+
cfg, err := MergeApplicationConfig(appCfg)
580+
if err != nil {
581+
t.Fatalf("MergeApplicationConfig: %v", err)
582+
}
583+
584+
if cfg.Plugins == nil {
585+
t.Fatal("expected Plugins to be non-nil after merge")
586+
}
587+
if len(cfg.Plugins.External) != 2 {
588+
t.Fatalf("expected 2 plugins (foo + bar), got %d", len(cfg.Plugins.External))
589+
}
590+
591+
// First definition wins — foo should have version "1.0" from workflow A
592+
var fooVer string
593+
for _, p := range cfg.Plugins.External {
594+
if p.Name == "foo" {
595+
fooVer = p.Version
596+
}
597+
}
598+
if fooVer != "1.0" {
599+
t.Errorf("expected foo version 1.0 (first definition wins), got %s", fooVer)
600+
}
601+
}

docs/PLUGIN_AUTHORING.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,17 @@ func (p *Provider) CreateModule(typeName, name string, config map[string]any) (s
101101

102102
## Plugin Manifest
103103

104-
The `plugin.json` declares what your plugin provides. The name should match what you passed to `wfctl plugin init`:
104+
The `plugin.json` at the project root declares what your plugin provides. The `name`
105+
field **must match the short name** you passed to `wfctl plugin init` (e.g. `my-plugin`).
106+
This is the name used by the engine for plugin discovery, the `requires.plugins` dependency
107+
check, and `wfctl plugin install`.
108+
109+
> **Note:** The scaffolded `internal/provider.go` returns a manifest with the name prefixed
110+
> as `workflow-plugin-<short-name>` (e.g. `workflow-plugin-my-plugin`). That longer form is
111+
> the canonical name used in the **public registry** (`workflow-registry`) and in release
112+
> artifact URLs. When referencing your plugin in a workflow config's `requires.plugins` or
113+
> `plugins.external`, use the same short name you put in `plugin.json` — the engine resolves
114+
> both forms automatically.
105115
106116
```json
107117
{

module/pipeline_step_workflow_call.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type WorkflowCallStep struct {
3131
name string
3232
workflow string // target pipeline name
3333
mode WorkflowCallMode // "sync" (default) or "async"
34+
stopPipeline bool // if true, stop parent pipeline after this step completes
3435
inputMapping map[string]string
3536
outputMapping map[string]string
3637
timeout time.Duration
@@ -86,6 +87,10 @@ func NewWorkflowCallStepFactory(lookup PipelineLookupFn) StepFactory {
8687
}
8788
}
8889

90+
if v, ok := cfg["stop_pipeline"].(bool); ok {
91+
step.stopPipeline = v
92+
}
93+
8994
return step, nil
9095
}
9196
}
@@ -101,9 +106,13 @@ func (s *WorkflowCallStep) Execute(ctx context.Context, pc *PipelineContext) (*S
101106
if s.lookup == nil {
102107
return nil, fmt.Errorf("workflow_call step %q: no pipeline lookup function configured", s.name)
103108
}
104-
target, ok := s.lookup(s.workflow)
109+
workflowName, resolveErr := s.tmpl.Resolve(s.workflow, pc)
110+
if resolveErr != nil {
111+
return nil, fmt.Errorf("workflow_call step %q: failed to resolve workflow name %q: %w", s.name, s.workflow, resolveErr)
112+
}
113+
target, ok := s.lookup(workflowName)
105114
if !ok {
106-
return nil, fmt.Errorf("workflow_call step %q: pipeline %q not found — ensure it is defined in the application config", s.name, s.workflow)
115+
return nil, fmt.Errorf("workflow_call step %q: pipeline %q not found — ensure it is defined in the application config", s.name, workflowName)
107116
}
108117

109118
// Build trigger data from input mapping or fall back to passing all current data
@@ -130,7 +139,7 @@ func (s *WorkflowCallStep) Execute(ctx context.Context, pc *PipelineContext) (*S
130139
defer cancel()
131140
_, _ = target.Execute(asyncCtx, data) //nolint:errcheck
132141
}(ctx, triggerData)
133-
return &StepResult{Output: map[string]any{"workflow": s.workflow, "mode": "async", "dispatched": true}}, nil
142+
return &StepResult{Output: map[string]any{"workflow": workflowName, "mode": "async", "dispatched": true}, Stop: s.stopPipeline}, nil
134143
}
135144

136145
// Sync mode: apply timeout and wait for result
@@ -139,7 +148,7 @@ func (s *WorkflowCallStep) Execute(ctx context.Context, pc *PipelineContext) (*S
139148

140149
childCtx, err := target.Execute(syncCtx, triggerData)
141150
if err != nil {
142-
return nil, fmt.Errorf("workflow_call step %q: workflow %q failed: %w", s.name, s.workflow, err)
151+
return nil, fmt.Errorf("workflow_call step %q: workflow %q failed: %w", s.name, workflowName, err)
143152
}
144153

145154
// Map outputs back to parent context
@@ -153,7 +162,7 @@ func (s *WorkflowCallStep) Execute(ctx context.Context, pc *PipelineContext) (*S
153162
output["result"] = childCtx.Current
154163
}
155164

156-
return &StepResult{Output: output}, nil
165+
return &StepResult{Output: output, Stop: s.stopPipeline}, nil
157166
}
158167

159168
// Ensure interface satisfaction at compile time.

0 commit comments

Comments
 (0)