From 174f7a475338bc99d63212fb38fade821e72d0f0 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Mon, 23 Feb 2026 06:48:35 -0500 Subject: [PATCH 1/2] feat: add multi-workflow application config and cross-workflow invocation (#80) - Add ApplicationConfig format for multi-workflow applications referencing separate workflow YAML files with shared module registry - Add step.workflow_call pipeline step for cross-workflow invocation with sync (call-and-wait) and async (fire-and-forget) modes - Add input/output mapping with template expansion for data passing - Add pipelineRegistry to StdEngine for runtime pipeline lookup - Add BuildFromApplicationConfig() with module name conflict detection - Add auto-detection of application configs in cmd/server - Add step.workflow_call schema to module schemas - Add example multi-workflow app (chat platform with 3 workflows) - Add comprehensive tests for workflow call step and multi-config loading Closes #80 Co-Authored-By: Claude Opus 4.6 --- cmd/server/main.go | 92 +++- cmd/server/main_test.go | 12 +- config/config.go | 123 +++++ engine.go | 66 ++- engine_multi_config_test.go | 482 +++++++++++++++++++ example/multi-workflow/app.yaml | 15 + example/multi-workflow/escalation.yaml | 45 ++ example/multi-workflow/main-loop.yaml | 52 ++ example/multi-workflow/queue-assignment.yaml | 51 ++ module/command_handler_test.go | 1 - module/pipeline_step_workflow_call.go | 159 ++++++ module/pipeline_step_workflow_call_test.go | 388 +++++++++++++++ module/query_handler_test.go | 1 - plugins/api/plugin.go | 14 +- plugins/pipelinesteps/plugin.go | 1 + schema/module_schema.go | 21 + schema/schema.go | 1 + 17 files changed, 1505 insertions(+), 19 deletions(-) create mode 100644 engine_multi_config_test.go create mode 100644 example/multi-workflow/app.yaml create mode 100644 example/multi-workflow/escalation.yaml create mode 100644 example/multi-workflow/main-loop.yaml create mode 100644 example/multi-workflow/queue-assignment.yaml create mode 100644 module/pipeline_step_workflow_call.go create mode 100644 module/pipeline_step_workflow_call_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index aaf59157..785b1229 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -176,16 +176,33 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std // loadConfig loads a workflow configuration from the configured file path, // or returns an empty config if no path is set. -func loadConfig(logger *slog.Logger) (*config.WorkflowConfig, error) { +// If the config file contains an application-level config (multi-workflow), +// the returned WorkflowConfig will be nil and the ApplicationConfig will be set. +func loadConfig(logger *slog.Logger) (*config.WorkflowConfig, *config.ApplicationConfig, error) { if *configFile != "" { + // Peek at the file to detect whether it is an application config. + data, err := os.ReadFile(*configFile) + if err != nil { + return nil, nil, fmt.Errorf("failed to read configuration file: %w", err) + } + + if config.IsApplicationConfig(data) { + logger.Info("Detected multi-workflow application config", "file", *configFile) + appCfg, err := config.LoadApplicationConfig(*configFile) + if err != nil { + return nil, nil, fmt.Errorf("failed to load application configuration: %w", err) + } + return nil, appCfg, nil + } + cfg, err := config.LoadFromFile(*configFile) if err != nil { - return nil, fmt.Errorf("failed to load configuration: %w", err) + return nil, nil, fmt.Errorf("failed to load configuration: %w", err) } - return cfg, nil + return cfg, nil, nil } logger.Info("No config file specified, using empty workflow config") - return config.NewEmptyWorkflowConfig(), nil + return config.NewEmptyWorkflowConfig(), nil, nil } // --------------------------------------------------------------------------- @@ -345,6 +362,60 @@ func setup(logger *slog.Logger, cfg *config.WorkflowConfig) (*serverApp, error) return app, nil } +// setupFromAppConfig initializes all server components from a multi-workflow +// application config. It merges all workflow files into a combined WorkflowConfig, +// applies the admin config overlay, then builds the engine using +// BuildFromApplicationConfig so cross-workflow pipeline calls are wired up. +func setupFromAppConfig(logger *slog.Logger, appCfg *config.ApplicationConfig) (*serverApp, error) { + // Merge all workflow files into a combined config so the admin overlay + // can be applied consistently (module names, route configs, etc.). + combined, err := config.MergeApplicationConfig(appCfg) + if err != nil { + return nil, fmt.Errorf("failed to merge application config: %w", err) + } + + // Apply admin config overlay (admin UI, management routes, etc.). + if err := mergeAdminConfig(logger, combined); err != nil { + return nil, fmt.Errorf("failed to set up admin: %w", err) + } + + // Build the engine using the application config so BuildFromApplicationConfig + // is called (which handles the pipeline registry for step.workflow_call). + // We re-use buildEngineFromAppConfig but pass the merged+admin config separately + // as the admin modules are now part of combined. Since BuildFromApplicationConfig + // calls MergeApplicationConfig internally, we bypass that by calling + // BuildFromConfig on the already-merged config via buildEngine. + engine, loader, registry, err := buildEngine(combined, logger) + if err != nil { + return nil, fmt.Errorf("failed to build engine: %w", err) + } + + sApp := &serverApp{ + engine: engine, + logger: logger, + } + + pool := dynamic.NewInterpreterPool() + aiSvc, deploySvc := initAIService(logger, registry, pool) + initManagementHandlers(logger, engine, combined, sApp, aiSvc, deploySvc, loader, registry) + registerManagementServices(logger, sApp) + + sApp.postStartFuncs = append(sApp.postStartFuncs, func() error { + if err := sApp.initStores(logger); err != nil { + return err + } + return sApp.registerPostStartServices(logger) + }, func() error { + return sApp.importBundles(logger) + }) + + sApp.mgmt.auditLogger = audit.NewLogger(os.Stdout) + sApp.mgmt.auditLogger.LogConfigChange(context.Background(), "system", "server", + "server started with application config: "+appCfg.Application.Name) + + return sApp, nil +} + // mergeAdminConfig loads the embedded admin config and merges admin // modules/routes into the primary config. If --admin-ui-dir (or ADMIN_UI_DIR // env var) is set the static.fileserver root is updated to that path, @@ -1268,13 +1339,20 @@ func main() { logger.Warn("Multi-workflow mode requires the api package (not yet available); falling back to single-config mode") } - // Existing single-config behavior - cfg, err := loadConfig(logger) + // Load configuration — supports both single-workflow and multi-workflow application configs. + cfg, appCfg, err := loadConfig(logger) if err != nil { log.Fatalf("Configuration error: %v", err) } - app, err := setup(logger, cfg) + var app *serverApp + if appCfg != nil { + // Multi-workflow application config: build engine from application config + app, err = setupFromAppConfig(logger, appCfg) + } else { + // Single-workflow config (backward-compatible) + app, err = setup(logger, cfg) + } if err != nil { log.Fatalf("Setup error: %v", err) } diff --git a/cmd/server/main_test.go b/cmd/server/main_test.go index 4f6a86ac..c013cd9f 100644 --- a/cmd/server/main_test.go +++ b/cmd/server/main_test.go @@ -329,13 +329,16 @@ func TestLoadConfig_NoFile(t *testing.T) { *configFile = "" logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) - cfg, err := loadConfig(logger) + cfg, appCfg, err := loadConfig(logger) if err != nil { t.Fatalf("loadConfig failed: %v", err) } if cfg == nil { t.Fatal("expected non-nil config") } + if appCfg != nil { + t.Fatal("expected nil application config for empty configFile") + } if len(cfg.Modules) != 0 { t.Errorf("expected empty modules, got %d", len(cfg.Modules)) } @@ -346,7 +349,7 @@ func TestLoadConfig_InvalidFile(t *testing.T) { defer func() { *configFile = "" }() logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) - _, err := loadConfig(logger) + _, _, err := loadConfig(logger) if err == nil { t.Fatal("expected error for nonexistent config file") } @@ -377,10 +380,13 @@ triggers: {} defer func() { *configFile = "" }() logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) - cfg, err := loadConfig(logger) + cfg, appCfg, err := loadConfig(logger) if err != nil { t.Fatalf("loadConfig failed: %v", err) } + if appCfg != nil { + t.Fatal("expected nil application config for single-workflow YAML") + } if len(cfg.Modules) != 1 { t.Fatalf("expected 1 module, got %d", len(cfg.Modules)) } diff --git a/config/config.go b/config/config.go index 4fd699f5..3ae240e8 100644 --- a/config/config.go +++ b/config/config.go @@ -8,6 +8,67 @@ import ( "gopkg.in/yaml.v3" ) +// WorkflowRef is a reference to a workflow config file within an application config. +type WorkflowRef struct { + // File is the path to the workflow YAML config file (relative to the application config). + File string `json:"file" yaml:"file"` + // Name is an optional override for the workflow's name within the application namespace. + // If empty, the filename stem (without extension) is used. + Name string `json:"name,omitempty" yaml:"name,omitempty"` +} + +// ApplicationInfo holds top-level metadata about a multi-workflow application. +type ApplicationInfo struct { + // Name is the application name. + Name string `json:"name" yaml:"name"` + // Workflows lists the workflow config files that make up this application. + Workflows []WorkflowRef `json:"workflows" yaml:"workflows"` +} + +// ApplicationConfig is the top-level config for a multi-workflow application. +// It references multiple workflow config files that share a module registry. +type ApplicationConfig struct { + // Application holds the application-level metadata and workflow references. + Application ApplicationInfo `json:"application" yaml:"application"` + // ConfigDir is the directory of the application config file, used for resolving relative paths. + ConfigDir string `json:"-" yaml:"-"` +} + +// LoadApplicationConfig loads an application config from a YAML file. +func LoadApplicationConfig(filepath string) (*ApplicationConfig, error) { + data, err := os.ReadFile(filepath) + if err != nil { + return nil, fmt.Errorf("failed to read application config file: %w", err) + } + + var cfg ApplicationConfig + if err := yaml.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("failed to parse application config file: %w", err) + } + + // Store the config file's directory for relative path resolution + absPath, err := pathpkg.Abs(filepath) + if err == nil { + cfg.ConfigDir = pathpkg.Dir(absPath) + } + + return &cfg, nil +} + +// IsApplicationConfig returns true if the YAML data contains an application-level config +// (i.e., has an "application" key with a "workflows" section). +func IsApplicationConfig(data []byte) bool { + var probe struct { + Application *struct { + Workflows []any `yaml:"workflows"` + } `yaml:"application"` + } + if err := yaml.Unmarshal(data, &probe); err != nil { + return false + } + return probe.Application != nil && len(probe.Application.Workflows) > 0 +} + // ModuleConfig represents a single module configuration type ModuleConfig struct { Name string `json:"name" yaml:"name"` @@ -98,6 +159,68 @@ func ResolvePathInConfig(cfg map[string]any, path string) string { return path } +// MergeApplicationConfig loads all workflow config files referenced by an +// ApplicationConfig and merges them into a single WorkflowConfig. This is +// useful for callers that need a single combined config (e.g., the server's +// admin merge step) before passing it to the engine. +// +// Module name conflicts across files are reported as errors. +func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error) { + if appCfg == nil { + return nil, fmt.Errorf("application config is nil") + } + + combined := NewEmptyWorkflowConfig() + seenModules := make(map[string]string) + + for _, ref := range appCfg.Application.Workflows { + if ref.File == "" { + return nil, fmt.Errorf("application %q: workflow reference has no 'file' field", appCfg.Application.Name) + } + + filePath := ref.File + if !pathpkg.IsAbs(filePath) && appCfg.ConfigDir != "" { + filePath = pathpkg.Join(appCfg.ConfigDir, filePath) + } + + wfCfg, err := LoadFromFile(filePath) + if err != nil { + return nil, fmt.Errorf("application %q: failed to load workflow file %q: %w", appCfg.Application.Name, ref.File, err) + } + + // Derive a name for error messages + wfName := ref.Name + if wfName == "" { + base := pathpkg.Base(filePath) + wfName = base[:len(base)-len(pathpkg.Ext(base))] + } + + for _, modCfg := range wfCfg.Modules { + if existing, conflict := seenModules[modCfg.Name]; conflict { + return nil, fmt.Errorf("application %q: module name conflict: module %q is defined in both %q and %q", + appCfg.Application.Name, modCfg.Name, existing, wfName) + } + seenModules[modCfg.Name] = wfName + } + + combined.Modules = append(combined.Modules, wfCfg.Modules...) + for k, v := range wfCfg.Workflows { + combined.Workflows[k] = v + } + for k, v := range wfCfg.Triggers { + combined.Triggers[k] = v + } + for k, v := range wfCfg.Pipelines { + combined.Pipelines[k] = v + } + if combined.ConfigDir == "" { + combined.ConfigDir = wfCfg.ConfigDir + } + } + + return combined, nil +} + // NewEmptyWorkflowConfig creates a new empty workflow configuration func NewEmptyWorkflowConfig() *WorkflowConfig { return &WorkflowConfig{ diff --git a/engine.go b/engine.go index 44089000..6c8e9f61 100644 --- a/engine.go +++ b/engine.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "path/filepath" "strings" "time" @@ -74,6 +75,10 @@ type StdEngine struct { // triggerConfigWrappers maps trigger type keys to functions that convert // flat pipeline trigger config into the trigger's native format. triggerConfigWrappers map[string]plugin.TriggerConfigWrapperFunc + + // pipelineRegistry holds all registered pipelines by name, enabling + // step.workflow_call to look up sibling pipelines at execution time. + pipelineRegistry map[string]*module.Pipeline } // App returns the underlying modular.Application. @@ -118,7 +123,7 @@ func (e *StdEngine) SetPluginInstaller(installer *plugin.PluginInstaller) { // NewStdEngine creates a new workflow engine func NewStdEngine(app modular.Application, logger modular.Logger) *StdEngine { - return &StdEngine{ + e := &StdEngine{ app: app, workflowHandlers: make([]WorkflowHandler, 0), moduleFactories: make(map[string]ModuleFactory), @@ -130,7 +135,17 @@ func NewStdEngine(app modular.Application, logger modular.Logger) *StdEngine { stepRegistry: module.NewStepRegistry(), triggerTypeMap: make(map[string]string), triggerConfigWrappers: make(map[string]plugin.TriggerConfigWrapperFunc), + pipelineRegistry: make(map[string]*module.Pipeline), } + // Register the step.workflow_call factory with a closure that looks up + // pipelines from this engine's registry at execution time. + e.stepRegistry.Register("step.workflow_call", module.NewWorkflowCallStepFactory( + func(name string) (*module.Pipeline, bool) { + p, ok := e.pipelineRegistry[name] + return p, ok + }, + )) + return e } // SecretsResolver returns the engine's multi-provider secrets resolver. @@ -444,6 +459,52 @@ func (e *StdEngine) BuildFromConfig(cfg *config.WorkflowConfig) error { return nil } +// BuildFromApplicationConfig loads a multi-workflow application config and builds +// the engine from all referenced workflow configs. Each workflow config file is +// parsed independently, and their modules are merged into the shared module +// registry. Module name conflicts across workflow files produce a clear error. +// +// This is the entry point for the application-level multi-workflow feature: +// +// application: +// name: chat-platform +// workflows: +// - file: workflows/main-loop.yaml +// - file: workflows/queue-assignment.yaml +// +// All pipelines defined across workflow files share a single engine and can +// call each other using step.workflow_call. +func (e *StdEngine) BuildFromApplicationConfig(appCfg *config.ApplicationConfig) error { + if appCfg == nil { + return fmt.Errorf("application config is nil") + } + if len(appCfg.Application.Workflows) == 0 { + return fmt.Errorf("application %q has no workflow files defined", appCfg.Application.Name) + } + + e.logger.Info(fmt.Sprintf("Building application %q from %d workflow files", + appCfg.Application.Name, len(appCfg.Application.Workflows))) + + // Use the shared MergeApplicationConfig helper (also used by the server's + // admin config merge step) to load and validate all workflow files. + combined, err := config.MergeApplicationConfig(appCfg) + if err != nil { + return fmt.Errorf("application %q: %w", appCfg.Application.Name, err) + } + + for _, ref := range appCfg.Application.Workflows { + wfName := ref.Name + if wfName == "" { + base := filepath.Base(ref.File) + wfName = strings.TrimSuffix(base, filepath.Ext(base)) + } + e.logger.Info(fmt.Sprintf("Application %q: loaded workflow %q from %q", + appCfg.Application.Name, wfName, ref.File)) + } + + return e.BuildFromConfig(combined) +} + // Start starts all modules and triggers func (e *StdEngine) Start(ctx context.Context) error { err := e.app.Start() @@ -652,6 +713,9 @@ func (e *StdEngine) configurePipelines(pipelineCfg map[string]any) error { } adder.AddPipeline(pipelineName, pipeline) + // Register in the engine's pipeline registry so step.workflow_call can + // look up this pipeline at execution time. + e.pipelineRegistry[pipelineName] = pipeline e.logger.Info(fmt.Sprintf("Configured pipeline: %s (%d steps)", pipelineName, len(steps))) // Create trigger from inline trigger config if present. diff --git a/engine_multi_config_test.go b/engine_multi_config_test.go new file mode 100644 index 00000000..9e1a3354 --- /dev/null +++ b/engine_multi_config_test.go @@ -0,0 +1,482 @@ +package workflow + +import ( + "context" + "io" + "log/slog" + "os" + "path/filepath" + "testing" + + "github.com/CrisisTextLine/modular" + "github.com/GoCodeAlone/workflow/config" + "github.com/GoCodeAlone/workflow/handlers" + "github.com/GoCodeAlone/workflow/module" + "github.com/GoCodeAlone/workflow/plugins/pipelinesteps" +) + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func discardSlogLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +// newTestEngine builds a minimal engine with the pipeline steps plugin loaded. +func newTestEngine(t *testing.T) *StdEngine { + t.Helper() + logger := discardSlogLogger() + app := modular.NewStdApplication(nil, logger) + engine := NewStdEngine(app, logger) + if err := engine.LoadPlugin(pipelinesteps.New()); err != nil { + t.Fatalf("LoadPlugin pipelinesteps: %v", err) + } + // Register the PipelineWorkflowHandler (normally done via LoadPlugin) + engine.RegisterWorkflowHandler(handlers.NewPipelineWorkflowHandler()) + return engine +} + +// writeTempYAML writes YAML content to a temp file and returns its path. +func writeTempYAML(t *testing.T, dir, filename, content string) string { + t.Helper() + path := filepath.Join(dir, filename) + if err := os.WriteFile(path, []byte(content), 0o600); err != nil { + t.Fatalf("writeTempYAML %s: %v", filename, err) + } + return path +} + +// --------------------------------------------------------------------------- +// Tests for config.IsApplicationConfig +// --------------------------------------------------------------------------- + +func TestIsApplicationConfig_True(t *testing.T) { + yaml := ` +application: + name: my-app + workflows: + - file: a.yaml + - file: b.yaml +` + if !config.IsApplicationConfig([]byte(yaml)) { + t.Fatal("expected IsApplicationConfig to return true") + } +} + +func TestIsApplicationConfig_False_SingleWorkflow(t *testing.T) { + yaml := ` +modules: [] +workflows: {} +triggers: {} +` + if config.IsApplicationConfig([]byte(yaml)) { + t.Fatal("expected IsApplicationConfig to return false for single-workflow config") + } +} + +func TestIsApplicationConfig_False_EmptyWorkflows(t *testing.T) { + yaml := ` +application: + name: my-app + workflows: [] +` + if config.IsApplicationConfig([]byte(yaml)) { + t.Fatal("expected IsApplicationConfig to return false when workflows list is empty") + } +} + +// --------------------------------------------------------------------------- +// Tests for config.LoadApplicationConfig +// --------------------------------------------------------------------------- + +func TestLoadApplicationConfig_Basic(t *testing.T) { + dir := t.TempDir() + appYAML := ` +application: + name: chat-platform + workflows: + - file: main-loop.yaml + - file: queue-assignment.yaml + name: queue-assign +` + appPath := writeTempYAML(t, dir, "app.yaml", appYAML) + + cfg, err := config.LoadApplicationConfig(appPath) + if err != nil { + t.Fatalf("LoadApplicationConfig: %v", err) + } + if cfg.Application.Name != "chat-platform" { + t.Errorf("application name = %q, want chat-platform", cfg.Application.Name) + } + if len(cfg.Application.Workflows) != 2 { + t.Fatalf("expected 2 workflows, got %d", len(cfg.Application.Workflows)) + } + if cfg.Application.Workflows[0].File != "main-loop.yaml" { + t.Errorf("workflow 0 file = %q, want main-loop.yaml", cfg.Application.Workflows[0].File) + } + if cfg.Application.Workflows[1].Name != "queue-assign" { + t.Errorf("workflow 1 name = %q, want queue-assign", cfg.Application.Workflows[1].Name) + } + if cfg.ConfigDir == "" { + t.Error("expected ConfigDir to be set") + } +} + +func TestLoadApplicationConfig_FileNotFound(t *testing.T) { + _, err := config.LoadApplicationConfig("/nonexistent/path/app.yaml") + if err == nil { + t.Fatal("expected error for nonexistent file") + } +} + +// --------------------------------------------------------------------------- +// Tests for StdEngine.BuildFromApplicationConfig +// --------------------------------------------------------------------------- + +func TestBuildFromApplicationConfig_Nil(t *testing.T) { + engine := newTestEngine(t) + err := engine.BuildFromApplicationConfig(nil) + if err == nil { + t.Fatal("expected error for nil application config") + } +} + +func TestBuildFromApplicationConfig_EmptyWorkflows(t *testing.T) { + engine := newTestEngine(t) + err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{ + Application: config.ApplicationInfo{ + Name: "empty-app", + Workflows: []config.WorkflowRef{}, + }, + }) + if err == nil { + t.Fatal("expected error for empty workflow list") + } +} + +func TestBuildFromApplicationConfig_MissingFile(t *testing.T) { + engine := newTestEngine(t) + err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{ + Application: config.ApplicationInfo{ + Name: "broken-app", + Workflows: []config.WorkflowRef{ + {File: "/nonexistent/workflow.yaml"}, + }, + }, + }) + if err == nil { + t.Fatal("expected error for missing workflow file") + } +} + +func TestBuildFromApplicationConfig_ModuleNameConflict(t *testing.T) { + dir := t.TempDir() + + // Both workflow files define a module named "shared-cache" + // The module type doesn't matter for conflict detection (it happens before build) + wfA := ` +modules: + - name: shared-cache + type: storage.sqlite + config: + dsn: ":memory:" +workflows: {} +triggers: {} +` + wfB := ` +modules: + - name: shared-cache + type: storage.sqlite + config: + dsn: ":memory:" +workflows: {} +triggers: {} +` + writeTempYAML(t, dir, "a.yaml", wfA) + writeTempYAML(t, dir, "b.yaml", wfB) + + engine := newTestEngine(t) + err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{ + Application: config.ApplicationInfo{ + Name: "conflict-app", + Workflows: []config.WorkflowRef{ + {File: filepath.Join(dir, "a.yaml")}, + {File: filepath.Join(dir, "b.yaml")}, + }, + }, + ConfigDir: dir, + }) + if err == nil { + t.Fatal("expected error for module name conflict") + } +} + +func TestBuildFromApplicationConfig_MultipleWorkflows_MergesPipelines(t *testing.T) { + dir := t.TempDir() + + // Workflow A: defines an "echo" pipeline + wfA := ` +modules: [] +workflows: {} +triggers: {} +pipelines: + echo: + steps: + - name: set-msg + type: step.set + config: + values: + message: "hello from echo" +` + // Workflow B: defines a "greet" pipeline + wfB := ` +modules: [] +workflows: {} +triggers: {} +pipelines: + greet: + steps: + - name: set-greeting + type: step.set + config: + values: + greeting: "hello world" +` + writeTempYAML(t, dir, "wf-a.yaml", wfA) + writeTempYAML(t, dir, "wf-b.yaml", wfB) + + engine := newTestEngine(t) + if err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{ + Application: config.ApplicationInfo{ + Name: "merged-app", + Workflows: []config.WorkflowRef{ + {File: filepath.Join(dir, "wf-a.yaml")}, + {File: filepath.Join(dir, "wf-b.yaml")}, + }, + }, + ConfigDir: dir, + }); err != nil { + t.Fatalf("BuildFromApplicationConfig: %v", err) + } + + // Both pipelines should be reachable via the engine's workflow handler + ctx := context.Background() + + // Trigger the echo pipeline + result, err := engine.TriggerWorkflowResult(ctx, "pipeline:echo", "", nil) + if err != nil { + t.Fatalf("TriggerWorkflowResult echo: %v", err) + } + if result["message"] != "hello from echo" { + t.Errorf("echo pipeline message = %v, want 'hello from echo'", result["message"]) + } + + // Trigger the greet pipeline + result, err = engine.TriggerWorkflowResult(ctx, "pipeline:greet", "", nil) + if err != nil { + t.Fatalf("TriggerWorkflowResult greet: %v", err) + } + if result["greeting"] != "hello world" { + t.Errorf("greet pipeline greeting = %v, want 'hello world'", result["greeting"]) + } +} + +func TestBuildFromApplicationConfig_WorkflowCall_CrossPipelineInvocation(t *testing.T) { + dir := t.TempDir() + + // Workflow A: a "validate" pipeline that uses step.workflow_call to invoke + // the "enrich" pipeline defined in Workflow B. + wfA := ` +modules: [] +workflows: {} +triggers: {} +pipelines: + validate: + steps: + - name: call-enrich + type: step.workflow_call + config: + workflow: enrich + mode: sync + input: + raw_id: "{{ .order_id }}" + output_mapping: + enriched_id: enriched_id +` + // Workflow B: defines the "enrich" pipeline that transforms an ID + wfB := ` +modules: [] +workflows: {} +triggers: {} +pipelines: + enrich: + steps: + - name: set-enriched + type: step.set + config: + values: + enriched_id: "ENRICHED-{{ .raw_id }}" +` + writeTempYAML(t, dir, "wf-a.yaml", wfA) + writeTempYAML(t, dir, "wf-b.yaml", wfB) + + engine := newTestEngine(t) + if err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{ + Application: config.ApplicationInfo{ + Name: "cross-call-app", + Workflows: []config.WorkflowRef{ + {File: filepath.Join(dir, "wf-a.yaml")}, + {File: filepath.Join(dir, "wf-b.yaml")}, + }, + }, + ConfigDir: dir, + }); err != nil { + t.Fatalf("BuildFromApplicationConfig: %v", err) + } + + ctx := context.Background() + result, err := engine.TriggerWorkflowResult(ctx, "pipeline:validate", "", map[string]any{ + "order_id": "42", + }) + if err != nil { + t.Fatalf("TriggerWorkflowResult validate: %v", err) + } + + if result["enriched_id"] != "ENRICHED-42" { + t.Errorf("enriched_id = %v, want ENRICHED-42", result["enriched_id"]) + } +} + +func TestBuildFromApplicationConfig_NameOverride(t *testing.T) { + dir := t.TempDir() + + wfYAML := ` +modules: [] +workflows: {} +triggers: {} +pipelines: + my-pipe: + steps: + - name: noop + type: step.set + config: + values: + done: true +` + filePath := writeTempYAML(t, dir, "some-workflow-file.yaml", wfYAML) + + // With explicit Name override, the file should still load correctly. + engine := newTestEngine(t) + if err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{ + Application: config.ApplicationInfo{ + Name: "named-app", + Workflows: []config.WorkflowRef{ + {File: filePath, Name: "my-custom-name"}, + }, + }, + ConfigDir: dir, + }); err != nil { + t.Fatalf("BuildFromApplicationConfig: %v", err) + } + + ctx := context.Background() + result, err := engine.TriggerWorkflowResult(ctx, "pipeline:my-pipe", "", nil) + if err != nil { + t.Fatalf("TriggerWorkflowResult: %v", err) + } + if result["done"] != true { + t.Errorf("done = %v, want true", result["done"]) + } +} + +// --------------------------------------------------------------------------- +// TriggerWorkflowResult helper for tests +// --------------------------------------------------------------------------- + +// TriggerWorkflowResult is a test helper that executes a workflow and returns +// the result map. It mirrors TriggerWorkflow but returns the result for +// assertions without requiring the engine to expose ExecuteWorkflow directly. +func (e *StdEngine) TriggerWorkflowResult(ctx context.Context, workflowType string, action string, data map[string]any) (map[string]any, error) { + for _, handler := range e.workflowHandlers { + if handler.CanHandle(workflowType) { + if data == nil { + data = map[string]any{} + } + return handler.ExecuteWorkflow(ctx, workflowType, action, data) + } + } + return nil, nil +} + +// --------------------------------------------------------------------------- +// Tests for step.workflow_call in pipeline registry +// --------------------------------------------------------------------------- + +func TestEngine_PipelineRegistry_PopulatedAfterBuild(t *testing.T) { + dir := t.TempDir() + + wfYAML := ` +modules: [] +workflows: {} +triggers: {} +pipelines: + my-pipeline: + steps: + - name: noop + type: step.set + config: + values: + ok: true +` + writeTempYAML(t, dir, "wf.yaml", wfYAML) + + engine := newTestEngine(t) + if err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{ + Application: config.ApplicationInfo{ + Name: "test-app", + Workflows: []config.WorkflowRef{ + {File: filepath.Join(dir, "wf.yaml")}, + }, + }, + ConfigDir: dir, + }); err != nil { + t.Fatalf("BuildFromApplicationConfig: %v", err) + } + + // The pipeline should be in the registry + p, ok := engine.pipelineRegistry["my-pipeline"] + if !ok { + t.Fatal("expected 'my-pipeline' to be in pipeline registry") + } + if p == nil { + t.Fatal("expected non-nil pipeline in registry") + } +} + +func TestEngine_WorkflowCallStep_RegistryLookup(t *testing.T) { + // Test that step.workflow_call can look up pipelines via the engine's registry + logger := discardSlogLogger() + app := modular.NewStdApplication(nil, logger) + engine := NewStdEngine(app, logger) + + // Manually add a pipeline to the registry + targetPipeline := &module.Pipeline{ + Name: "target", + Steps: []module.PipelineStep{ + &module.SetStep{}, + }, + } + engine.pipelineRegistry["target"] = targetPipeline + + // The step.workflow_call factory should use the registry lookup + step, err := engine.stepRegistry.Create("step.workflow_call", "test-call", map[string]any{ + "workflow": "target", + }, app) + if err != nil { + t.Fatalf("stepRegistry.Create: %v", err) + } + if step == nil { + t.Fatal("expected non-nil step") + } +} diff --git a/example/multi-workflow/app.yaml b/example/multi-workflow/app.yaml new file mode 100644 index 00000000..be8e8c9b --- /dev/null +++ b/example/multi-workflow/app.yaml @@ -0,0 +1,15 @@ +# Multi-workflow application: Chat Platform +# +# This application config references three separate workflow YAML files. +# Each workflow file defines its own pipelines, but they share a single +# engine instance and can call each other using step.workflow_call. +# +# Usage: +# go run ./cmd/server -config example/multi-workflow/app.yaml + +application: + name: chat-platform + workflows: + - file: main-loop.yaml + - file: queue-assignment.yaml + - file: escalation.yaml diff --git a/example/multi-workflow/escalation.yaml b/example/multi-workflow/escalation.yaml new file mode 100644 index 00000000..350f7647 --- /dev/null +++ b/example/multi-workflow/escalation.yaml @@ -0,0 +1,45 @@ +# escalation.yaml — Conversation escalation pipeline +# +# This workflow handles escalation of urgent conversations to +# senior responders or supervisors. +# +# Called by main-loop.yaml via step.workflow_call when urgency is high. + +modules: [] + +workflows: {} + +triggers: {} + +pipelines: + # Escalate an urgent conversation to a supervisor + escalate-conversation: + steps: + # Step 1: Flag as escalated + - name: flag-escalation + type: step.set + config: + values: + escalation_level: "supervisor" + original_urgency: "{{ .urgency }}" + escalated: true + + # Step 2: Assign to escalation queue + - name: assign-escalation-queue + type: step.set + config: + values: + queue_name: "escalation-queue" + responder_id: "supervisor-01" + + # Step 3: Optionally call back into queue-assignment to record + # the escalation as a special assignment. This shows that pipelines + # from different files can freely call each other in any direction. + - name: notify-escalation + type: step.workflow_call + config: + workflow: notify-responder + mode: async + input: + responder_id: "supervisor-01" + conversation_id: "{{ .conversation_id }}" diff --git a/example/multi-workflow/main-loop.yaml b/example/multi-workflow/main-loop.yaml new file mode 100644 index 00000000..ac5d294d --- /dev/null +++ b/example/multi-workflow/main-loop.yaml @@ -0,0 +1,52 @@ +# main-loop.yaml — Main conversation intake pipeline +# +# This workflow receives incoming messages, classifies them, and routes +# to either the queue-assignment or escalation workflow via step.workflow_call. + +modules: [] + +workflows: {} + +triggers: {} + +pipelines: + # Incoming message handler + handle-message: + steps: + # Step 1: Parse and normalize the incoming message + - name: parse-message + type: step.set + config: + values: + conversation_id: "{{ .conversation_id }}" + channel: "{{ .channel }}" + message_text: "{{ .message_text }}" + urgency: "{{ .urgency | default \"normal\" }}" + + # Step 2: Route to queue-assignment for normal messages + # or escalation for urgent ones based on the urgency field. + # + # In a real deployment, you would use step.conditional here to branch, + # then call either queue-assignment or escalation. For this example we + # call queue-assignment directly to demonstrate the cross-workflow call. + - name: assign-to-queue + type: step.workflow_call + config: + workflow: assign-conversation + mode: sync + input: + conversation_id: "{{ .conversation_id }}" + channel: "{{ .channel }}" + urgency: "{{ .urgency }}" + output_mapping: + assigned_responder: responder_id + assigned_queue: queue_name + + # Step 3: Record the assignment + - name: record-assignment + type: step.set + config: + values: + status: "assigned" + responder: "{{ .assigned_responder }}" + queue: "{{ .assigned_queue }}" diff --git a/example/multi-workflow/queue-assignment.yaml b/example/multi-workflow/queue-assignment.yaml new file mode 100644 index 00000000..2f54cfbc --- /dev/null +++ b/example/multi-workflow/queue-assignment.yaml @@ -0,0 +1,51 @@ +# queue-assignment.yaml — Conversation queue assignment pipeline +# +# This workflow receives a conversation and assigns it to the right +# responder and queue based on channel and urgency. +# +# Called by main-loop.yaml via step.workflow_call. + +modules: [] + +workflows: {} + +triggers: {} + +pipelines: + # Core assignment logic: resolves the best responder and queue + assign-conversation: + steps: + # Step 1: Determine which queue to use based on channel + - name: resolve-queue + type: step.set + config: + values: + # In production this would call an external service or DB lookup. + # For demonstration, we derive the queue from the channel field. + queue_name: "{{ .channel }}-queue" + + # Step 2: Pick a responder (simplified round-robin placeholder) + - name: pick-responder + type: step.set + config: + values: + responder_id: "responder-{{ .urgency }}-01" + + # Step 3: Record the assignment + - name: mark-assigned + type: step.set + config: + values: + assignment_status: "confirmed" + + # Async notification: fire-and-forget notification to the assigned responder. + # Called with mode: async so the caller doesn't wait for the notification. + notify-responder: + steps: + - name: build-notification + type: step.set + config: + values: + notification_type: "new_conversation" + recipient: "{{ .responder_id }}" + conversation_id: "{{ .conversation_id }}" diff --git a/module/command_handler_test.go b/module/command_handler_test.go index 7819aad4..14605036 100644 --- a/module/command_handler_test.go +++ b/module/command_handler_test.go @@ -294,4 +294,3 @@ func TestCommandHandler_RoutePipeline_TypedNil(t *testing.T) { t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) } } - diff --git a/module/pipeline_step_workflow_call.go b/module/pipeline_step_workflow_call.go new file mode 100644 index 00000000..373b6efc --- /dev/null +++ b/module/pipeline_step_workflow_call.go @@ -0,0 +1,159 @@ +package module + +import ( + "context" + "fmt" + "time" + + "github.com/CrisisTextLine/modular" +) + +// WorkflowCallMode determines how a workflow_call step waits for results. +type WorkflowCallMode string + +const ( + // WorkflowCallModeSync executes the target pipeline synchronously and maps outputs back. + WorkflowCallModeSync WorkflowCallMode = "sync" + // WorkflowCallModeAsync fires the target pipeline and returns immediately without waiting. + WorkflowCallModeAsync WorkflowCallMode = "async" +) + +// PipelineLookupFn is a function that resolves a named pipeline by name. +// The engine provides this when building a WorkflowCallStep so the step can +// locate sibling pipelines at execution time without taking a direct dependency +// on the engine. +type PipelineLookupFn func(name string) (*Pipeline, bool) + +// WorkflowCallStep invokes another pipeline registered in the same engine. +// It supports synchronous and asynchronous execution modes with input/output +// template mapping identical to the sub_workflow step pattern. +type WorkflowCallStep struct { + name string + workflow string // target pipeline name + mode WorkflowCallMode // "sync" (default) or "async" + inputMapping map[string]string + outputMapping map[string]string + timeout time.Duration + lookup PipelineLookupFn + tmpl *TemplateEngine +} + +// NewWorkflowCallStepFactory returns a StepFactory for step.workflow_call. +// The lookup function is captured by closure so the step can resolve target +// pipelines at execution time (supporting pipelines registered after factory creation). +func NewWorkflowCallStepFactory(lookup PipelineLookupFn) StepFactory { + return func(name string, cfg map[string]any, _ modular.Application) (PipelineStep, error) { + workflowName, _ := cfg["workflow"].(string) + if workflowName == "" { + return nil, fmt.Errorf("workflow_call step %q: 'workflow' is required", name) + } + + mode := WorkflowCallModeSync + if m, ok := cfg["mode"].(string); ok && m == string(WorkflowCallModeAsync) { + mode = WorkflowCallModeAsync + } + + step := &WorkflowCallStep{ + name: name, + workflow: workflowName, + mode: mode, + timeout: 30 * time.Second, + lookup: lookup, + tmpl: NewTemplateEngine(), + } + + if im, ok := cfg["input"].(map[string]any); ok { + step.inputMapping = make(map[string]string, len(im)) + for k, v := range im { + if s, ok := v.(string); ok { + step.inputMapping[k] = s + } + } + } + + if om, ok := cfg["output_mapping"].(map[string]any); ok { + step.outputMapping = make(map[string]string, len(om)) + for k, v := range om { + if s, ok := v.(string); ok { + step.outputMapping[k] = s + } + } + } + + if timeout, ok := cfg["timeout"].(string); ok && timeout != "" { + if d, err := time.ParseDuration(timeout); err == nil { + step.timeout = d + } + } + + return step, nil + } +} + +// Name returns the step name. +func (s *WorkflowCallStep) Name() string { return s.name } + +// Execute runs the target workflow. In sync mode it waits for the result and +// maps outputs back into the parent context. In async mode it dispatches the +// child pipeline in a goroutine and returns immediately. +func (s *WorkflowCallStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + // Resolve the target pipeline + if s.lookup == nil { + return nil, fmt.Errorf("workflow_call step %q: no pipeline lookup function configured", s.name) + } + target, ok := s.lookup(s.workflow) + if !ok { + return nil, fmt.Errorf("workflow_call step %q: pipeline %q not found — ensure it is defined in the application config", s.name, s.workflow) + } + + // Build trigger data from input mapping or fall back to passing all current data + triggerData := make(map[string]any) + if s.inputMapping != nil { + for childKey, tmplExpr := range s.inputMapping { + resolved, resolveErr := s.tmpl.Resolve(tmplExpr, pc) + if resolveErr != nil { + return nil, fmt.Errorf("workflow_call step %q: failed to resolve input %q: %w", s.name, childKey, resolveErr) + } + triggerData[childKey] = resolved + } + } else { + for k, v := range pc.Current { + triggerData[k] = v + } + } + + if s.mode == WorkflowCallModeAsync { + // Fire-and-forget: run in background goroutine with its own timeout + go func() { + asyncCtx, cancel := context.WithTimeout(context.Background(), s.timeout) + defer cancel() + _, _ = target.Execute(asyncCtx, triggerData) //nolint:errcheck + }() + return &StepResult{Output: map[string]any{"workflow": s.workflow, "mode": "async", "dispatched": true}}, nil + } + + // Sync mode: apply timeout and wait for result + syncCtx, cancel := context.WithTimeout(ctx, s.timeout) + defer cancel() + + childCtx, err := target.Execute(syncCtx, triggerData) + if err != nil { + return nil, fmt.Errorf("workflow_call step %q: workflow %q failed: %w", s.name, s.workflow, err) + } + + // Map outputs back to parent context + output := make(map[string]any) + if s.outputMapping != nil { + for parentKey, childPath := range s.outputMapping { + output[parentKey] = resolveOutputPath(childCtx, childPath) + } + } else { + // No explicit mapping — return all child outputs under "result" + output["result"] = childCtx.Current + } + + return &StepResult{Output: output}, nil +} + +// Ensure interface satisfaction at compile time. +var _ PipelineStep = (*WorkflowCallStep)(nil) diff --git a/module/pipeline_step_workflow_call_test.go b/module/pipeline_step_workflow_call_test.go new file mode 100644 index 00000000..da1a39ef --- /dev/null +++ b/module/pipeline_step_workflow_call_test.go @@ -0,0 +1,388 @@ +package module + +import ( + "context" + "fmt" + "testing" + "time" +) + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +// fixedOutputPipeline returns a *Pipeline that always executes a single step +// producing the given output map. +func fixedOutputPipeline(name string, output map[string]any) *Pipeline { + return &Pipeline{ + Name: name, + Steps: []PipelineStep{&echoStep{name: "echo", output: output}}, + } +} + +// capturingPipeline returns a *Pipeline whose only step captures the trigger +// data it receives. The captured map is written to *captured after Execute. +func capturingPipeline(name string, captured *map[string]any) *Pipeline { + cs := &inputCapturingStep{name: "capture"} + p := &Pipeline{Name: name, Steps: []PipelineStep{cs}} + // After Execute, copy step's captured data to the output pointer. + _ = captured + // We return a pipeline with a wrapper step that updates *captured. + p.Steps = []PipelineStep{&triggerDataCapture{inner: cs, out: captured}} + return p +} + +// triggerDataCapture wraps inputCapturingStep and copies result to *out. +type triggerDataCapture struct { + inner *inputCapturingStep + out *map[string]any +} + +func (t *triggerDataCapture) Name() string { return t.inner.Name() } +func (t *triggerDataCapture) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + r, err := t.inner.Execute(ctx, pc) + if t.out != nil { + *t.out = t.inner.captured + } + return r, err +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +func TestWorkflowCallStep_MissingWorkflowField(t *testing.T) { + factory := NewWorkflowCallStepFactory(func(name string) (*Pipeline, bool) { return nil, false }) + _, err := factory("missing-wf", map[string]any{}, nil) + if err == nil { + t.Fatal("expected error when workflow field is missing") + } +} + +func TestWorkflowCallStep_PipelineNotFound(t *testing.T) { + lookup := func(name string) (*Pipeline, bool) { return nil, false } + factory := NewWorkflowCallStepFactory(lookup) + step, err := factory("call", map[string]any{"workflow": "nonexistent"}, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error when pipeline not found") + } +} + +func TestWorkflowCallStep_NilLookup(t *testing.T) { + factory := NewWorkflowCallStepFactory(nil) + step, err := factory("call", map[string]any{"workflow": "some-pipeline"}, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error when lookup is nil") + } +} + +func TestWorkflowCallStep_SyncMode_DefaultOutput(t *testing.T) { + target := fixedOutputPipeline("target", map[string]any{"status": "ok", "id": "123"}) + lookup := func(name string) (*Pipeline, bool) { + if name == "target" { + return target, true + } + return nil, false + } + + factory := NewWorkflowCallStepFactory(lookup) + step, err := factory("call-target", map[string]any{ + "workflow": "target", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{"order_id": "ORD-001"}, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + // With no output_mapping, all child outputs are returned under "result" + resultMap, ok := result.Output["result"].(map[string]any) + if !ok { + t.Fatalf("expected result to be map[string]any, got %T", result.Output["result"]) + } + if resultMap["status"] != "ok" { + t.Errorf("result.status = %v, want ok", resultMap["status"]) + } + if resultMap["id"] != "123" { + t.Errorf("result.id = %v, want 123", resultMap["id"]) + } +} + +func TestWorkflowCallStep_SyncMode_OutputMapping(t *testing.T) { + target := fixedOutputPipeline("target", map[string]any{"responder_id": "agent-42", "queue": "tier1"}) + lookup := func(name string) (*Pipeline, bool) { + return target, name == "target" + } + + factory := NewWorkflowCallStepFactory(lookup) + step, err := factory("assign", map[string]any{ + "workflow": "target", + "output_mapping": map[string]any{ + "assigned_responder": "responder_id", + "assigned_queue": "queue", + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["assigned_responder"] != "agent-42" { + t.Errorf("assigned_responder = %v, want agent-42", result.Output["assigned_responder"]) + } + if result.Output["assigned_queue"] != "tier1" { + t.Errorf("assigned_queue = %v, want tier1", result.Output["assigned_queue"]) + } +} + +func TestWorkflowCallStep_SyncMode_InputMapping(t *testing.T) { + var captured map[string]any + target := capturingPipeline("target", &captured) + lookup := func(name string) (*Pipeline, bool) { + return target, name == "target" + } + + factory := NewWorkflowCallStepFactory(lookup) + step, err := factory("call-with-input", map[string]any{ + "workflow": "target", + "input": map[string]any{ + "conversation_id": "{{ .conv_id }}", + "priority": "{{ .urgency }}", + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "conv_id": "CONV-99", + "urgency": "high", + }, nil) + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if captured["conversation_id"] != "CONV-99" { + t.Errorf("conversation_id = %v, want CONV-99", captured["conversation_id"]) + } + if captured["priority"] != "high" { + t.Errorf("priority = %v, want high", captured["priority"]) + } +} + +func TestWorkflowCallStep_SyncMode_PassthroughInput(t *testing.T) { + // When no input mapping is specified, all current context data is passed through + var captured map[string]any + target := capturingPipeline("target", &captured) + lookup := func(name string) (*Pipeline, bool) { + return target, name == "target" + } + + factory := NewWorkflowCallStepFactory(lookup) + step, err := factory("passthrough", map[string]any{ + "workflow": "target", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(map[string]any{ + "order_id": "ORD-007", + "amount": "49.99", + }, nil) + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if captured["order_id"] != "ORD-007" { + t.Errorf("order_id passthrough = %v, want ORD-007", captured["order_id"]) + } + if captured["amount"] != "49.99" { + t.Errorf("amount passthrough = %v, want 49.99", captured["amount"]) + } +} + +func TestWorkflowCallStep_AsyncMode(t *testing.T) { + done := make(chan struct{}, 1) + asyncPipeline := &Pipeline{ + Name: "async-target", + Steps: []PipelineStep{ + &callbackStep{ + name: "notify", + fn: func() { + done <- struct{}{} + }, + }, + }, + } + lookup := func(name string) (*Pipeline, bool) { + return asyncPipeline, name == "async-target" + } + + factory := NewWorkflowCallStepFactory(lookup) + step, err := factory("fire-and-forget", map[string]any{ + "workflow": "async-target", + "mode": "async", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + // Should return immediately with dispatch confirmation + if result.Output["dispatched"] != true { + t.Errorf("expected dispatched=true, got %v", result.Output["dispatched"]) + } + if result.Output["mode"] != "async" { + t.Errorf("expected mode=async, got %v", result.Output["mode"]) + } + + // Wait for the async pipeline to actually run + select { + case <-done: + // success + case <-time.After(2 * time.Second): + t.Fatal("async pipeline did not execute within timeout") + } +} + +func TestWorkflowCallStep_Timeout(t *testing.T) { + slowPipeline := &Pipeline{ + Name: "slow", + Steps: []PipelineStep{ + &sleepStep{name: "sleep", duration: 5 * time.Second}, + }, + } + lookup := func(name string) (*Pipeline, bool) { + return slowPipeline, name == "slow" + } + + factory := NewWorkflowCallStepFactory(lookup) + step, err := factory("timeout-call", map[string]any{ + "workflow": "slow", + "timeout": "50ms", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected timeout error") + } +} + +func TestWorkflowCallStep_Name(t *testing.T) { + factory := NewWorkflowCallStepFactory(func(name string) (*Pipeline, bool) { return nil, false }) + step, err := factory("my-call-step", map[string]any{"workflow": "target"}, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + if step.Name() != "my-call-step" { + t.Errorf("Name() = %q, want my-call-step", step.Name()) + } +} + +func TestWorkflowCallStep_DefaultMode_IsSync(t *testing.T) { + target := fixedOutputPipeline("tgt", map[string]any{"done": true}) + factory := NewWorkflowCallStepFactory(func(name string) (*Pipeline, bool) { + return target, name == "tgt" + }) + step, err := factory("no-mode", map[string]any{"workflow": "tgt"}, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + // Sync mode: result should contain child outputs, not dispatch info + if _, ok := result.Output["dispatched"]; ok { + t.Error("expected sync mode but got async dispatch confirmation") + } + if result.Output["result"] == nil { + t.Error("expected sync result to have 'result' key") + } +} + +func TestWorkflowCallStep_ChildError_PropagatesInSync(t *testing.T) { + failPipeline := &Pipeline{ + Name: "fail", + Steps: []PipelineStep{ + &failStep{name: "boom", err: fmt.Errorf("child workflow error")}, + }, + } + factory := NewWorkflowCallStepFactory(func(name string) (*Pipeline, bool) { + return failPipeline, name == "fail" + }) + step, err := factory("call-fail", map[string]any{"workflow": "fail"}, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error from failing child workflow") + } +} + +// --------------------------------------------------------------------------- +// Additional test step helpers +// --------------------------------------------------------------------------- + +// callbackStep calls a callback function when executed. +type callbackStep struct { + name string + fn func() +} + +func (s *callbackStep) Name() string { return s.name } +func (s *callbackStep) Execute(_ context.Context, _ *PipelineContext) (*StepResult, error) { + if s.fn != nil { + s.fn() + } + return &StepResult{Output: map[string]any{"called": true}}, nil +} + +// failStep always returns an error. +type failStep struct { + name string + err error +} + +func (s *failStep) Name() string { return s.name } +func (s *failStep) Execute(_ context.Context, _ *PipelineContext) (*StepResult, error) { + return nil, s.err +} diff --git a/module/query_handler_test.go b/module/query_handler_test.go index a8243538..132ebab5 100644 --- a/module/query_handler_test.go +++ b/module/query_handler_test.go @@ -299,4 +299,3 @@ func TestQueryHandler_RoutePipeline_TypedNil(t *testing.T) { t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) } } - diff --git a/plugins/api/plugin.go b/plugins/api/plugin.go index c3e1fa49..9ef86b44 100644 --- a/plugins/api/plugin.go +++ b/plugins/api/plugin.go @@ -100,12 +100,14 @@ func New() *Plugin { return &Plugin{ // Default constructors wrap the concrete module constructors, adapting // their return types to modular.Module via implicit interface satisfaction. - newQueryHandler: func(name string) modular.Module { return module.NewQueryHandler(name) }, - newCommandHandler: func(name string) modular.Module { return module.NewCommandHandler(name) }, - newRESTAPIHandler: func(name, resourceName string) modular.Module { return module.NewRESTAPIHandler(name, resourceName) }, - newAPIGateway: func(name string) modular.Module { return module.NewAPIGateway(name) }, - newWorkflowRegistry: func(name, storageBackend string) modular.Module { return module.NewWorkflowRegistry(name, storageBackend) }, - newDataTransformer: func(name string) modular.Module { return module.NewDataTransformer(name) }, + newQueryHandler: func(name string) modular.Module { return module.NewQueryHandler(name) }, + newCommandHandler: func(name string) modular.Module { return module.NewCommandHandler(name) }, + newRESTAPIHandler: func(name, resourceName string) modular.Module { return module.NewRESTAPIHandler(name, resourceName) }, + newAPIGateway: func(name string) modular.Module { return module.NewAPIGateway(name) }, + newWorkflowRegistry: func(name, storageBackend string) modular.Module { + return module.NewWorkflowRegistry(name, storageBackend) + }, + newDataTransformer: func(name string) modular.Module { return module.NewDataTransformer(name) }, newProcessingStep: func(name string, cfg module.ProcessingStepConfig) modular.Module { return module.NewProcessingStep(name, cfg) }, diff --git a/plugins/pipelinesteps/plugin.go b/plugins/pipelinesteps/plugin.go index 49b4591e..69a1c384 100644 --- a/plugins/pipelinesteps/plugin.go +++ b/plugins/pipelinesteps/plugin.go @@ -60,6 +60,7 @@ func New() *Plugin { "step.db_query", "step.db_exec", "step.json_response", + "step.workflow_call", }, WorkflowTypes: []string{"pipeline"}, Capabilities: []plugin.CapabilityDecl{ diff --git a/schema/module_schema.go b/schema/module_schema.go index a69cdd53..0595a668 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -1272,6 +1272,27 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { DefaultConfig: map[string]any{"timeout": "30s"}, }) + // ----------------------------------------------------------------------- + // Cross-workflow call step (multi-workflow composition) + // ----------------------------------------------------------------------- + + r.Register(&ModuleSchema{ + Type: "step.workflow_call", + Label: "Workflow Call", + Category: "composition", + Description: "Invokes another pipeline registered in the same engine application. Supports sync (call & wait) and async (fire-and-forget) modes with input/output mapping.", + Inputs: []ServiceIODef{{Name: "context", Type: "PipelineContext", Description: "Pipeline context with input data"}}, + Outputs: []ServiceIODef{{Name: "result", Type: "StepResult", Description: "Called workflow result with mapped outputs (sync mode) or dispatch confirmation (async mode)"}}, + ConfigFields: []ConfigFieldDef{ + {Key: "workflow", Label: "Workflow", Type: FieldTypeString, Required: true, Description: "Name of the target pipeline to call (must be registered in the same engine)", Placeholder: "queue-assignment"}, + {Key: "mode", Label: "Mode", Type: FieldTypeSelect, Options: []string{"sync", "async"}, DefaultValue: "sync", Description: "Execution mode: 'sync' waits for result, 'async' fires and returns immediately"}, + {Key: "input", Label: "Input Mapping", Type: FieldTypeJSON, Description: "Map of target pipeline input keys to template expressions from the current context (e.g. {\"conversation_id\": \"{{ .conversation_id }}\"}). If omitted, all current context data is passed through."}, + {Key: "output_mapping", Label: "Output Mapping", Type: FieldTypeJSON, Description: "Map of parent context keys to target pipeline output paths (e.g. {\"assigned_responder\": \"responder_id\"}). If omitted, all outputs are returned under 'result'."}, + {Key: "timeout", Label: "Timeout", Type: FieldTypeDuration, DefaultValue: "30s", Description: "Maximum execution time for the called workflow (applies to both sync and async modes)"}, + }, + DefaultConfig: map[string]any{"mode": "sync", "timeout": "30s"}, + }) + // ----------------------------------------------------------------------- // AI pipeline steps // ----------------------------------------------------------------------- diff --git a/schema/schema.go b/schema/schema.go index 5d2e60a7..e4ba24f0 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -138,6 +138,7 @@ var coreModuleTypes = []string{ "step.sub_workflow", "step.transform", "step.validate", + "step.workflow_call", "storage.gcs", "storage.local", "storage.s3", From 0e1f61a5e8a4b8484e8e6ee93dcf6fd35b59d3e7 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Feb 2026 11:23:57 -0500 Subject: [PATCH 2/2] fix: conflict detection for triggers/pipelines in MergeApplicationConfig, async context propagation, comment accuracy (#145) * Initial plan * fix: add conflict detection for triggers/pipelines, fix async context, clarify comment Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- cmd/server/main.go | 10 ++- config/config.go | 20 ++++++ engine_multi_config_test.go | 94 +++++++++++++++++++++++++++ module/pipeline_step_workflow_call.go | 11 ++-- 4 files changed, 124 insertions(+), 11 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 785b1229..e0cfda48 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -379,12 +379,10 @@ func setupFromAppConfig(logger *slog.Logger, appCfg *config.ApplicationConfig) ( return nil, fmt.Errorf("failed to set up admin: %w", err) } - // Build the engine using the application config so BuildFromApplicationConfig - // is called (which handles the pipeline registry for step.workflow_call). - // We re-use buildEngineFromAppConfig but pass the merged+admin config separately - // as the admin modules are now part of combined. Since BuildFromApplicationConfig - // calls MergeApplicationConfig internally, we bypass that by calling - // BuildFromConfig on the already-merged config via buildEngine. + // Build the engine from the already-merged application config (including the + // admin overlay). The merged config is passed directly to buildEngine, which + // internally uses BuildFromConfig and ensures features like the pipeline + // registry for step.workflow_call are configured correctly. engine, loader, registry, err := buildEngine(combined, logger) if err != nil { return nil, fmt.Errorf("failed to build engine: %w", err) diff --git a/config/config.go b/config/config.go index 3ae240e8..7a261306 100644 --- a/config/config.go +++ b/config/config.go @@ -171,7 +171,10 @@ func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error) } combined := NewEmptyWorkflowConfig() + combined.ConfigDir = appCfg.ConfigDir seenModules := make(map[string]string) + seenTriggers := make(map[string]string) + seenPipelines := make(map[string]string) for _, ref := range appCfg.Application.Workflows { if ref.File == "" { @@ -203,6 +206,21 @@ func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error) seenModules[modCfg.Name] = wfName } + for k := range wfCfg.Triggers { + if existing, conflict := seenTriggers[k]; conflict { + return nil, fmt.Errorf("application %q: trigger name conflict: trigger %q is defined in both %q and %q", + appCfg.Application.Name, k, existing, wfName) + } + seenTriggers[k] = wfName + } + for k := range wfCfg.Pipelines { + if existing, conflict := seenPipelines[k]; conflict { + return nil, fmt.Errorf("application %q: pipeline name conflict: pipeline %q is defined in both %q and %q", + appCfg.Application.Name, k, existing, wfName) + } + seenPipelines[k] = wfName + } + combined.Modules = append(combined.Modules, wfCfg.Modules...) for k, v := range wfCfg.Workflows { combined.Workflows[k] = v @@ -213,6 +231,8 @@ func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error) for k, v := range wfCfg.Pipelines { combined.Pipelines[k] = v } + // Fall back to first workflow file's directory if application config + // directory was not set. if combined.ConfigDir == "" { combined.ConfigDir = wfCfg.ConfigDir } diff --git a/engine_multi_config_test.go b/engine_multi_config_test.go index 9e1a3354..0d08e2fa 100644 --- a/engine_multi_config_test.go +++ b/engine_multi_config_test.go @@ -6,6 +6,7 @@ import ( "log/slog" "os" "path/filepath" + "strings" "testing" "github.com/CrisisTextLine/modular" @@ -212,6 +213,99 @@ triggers: {} } } +func TestBuildFromApplicationConfig_TriggerNameConflict(t *testing.T) { + dir := t.TempDir() + + wfA := ` +modules: [] +workflows: {} +triggers: + my-trigger: + type: http +pipelines: {} +` + wfB := ` +modules: [] +workflows: {} +triggers: + my-trigger: + type: schedule +pipelines: {} +` + writeTempYAML(t, dir, "a.yaml", wfA) + writeTempYAML(t, dir, "b.yaml", wfB) + + engine := newTestEngine(t) + err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{ + Application: config.ApplicationInfo{ + Name: "trigger-conflict-app", + Workflows: []config.WorkflowRef{ + {File: filepath.Join(dir, "a.yaml")}, + {File: filepath.Join(dir, "b.yaml")}, + }, + }, + ConfigDir: dir, + }) + if err == nil { + t.Fatal("expected error for trigger name conflict") + } + if !strings.Contains(err.Error(), "trigger name conflict") { + t.Fatalf("expected 'trigger name conflict' in error, got: %v", err) + } +} + +func TestBuildFromApplicationConfig_PipelineNameConflict(t *testing.T) { + dir := t.TempDir() + + wfA := ` +modules: [] +workflows: {} +triggers: {} +pipelines: + shared-pipeline: + steps: + - name: step-a + type: step.set + config: + values: + msg: "from a" +` + wfB := ` +modules: [] +workflows: {} +triggers: {} +pipelines: + shared-pipeline: + steps: + - name: step-b + type: step.set + config: + values: + msg: "from b" +` + writeTempYAML(t, dir, "a.yaml", wfA) + writeTempYAML(t, dir, "b.yaml", wfB) + + engine := newTestEngine(t) + err := engine.BuildFromApplicationConfig(&config.ApplicationConfig{ + Application: config.ApplicationInfo{ + Name: "pipeline-conflict-app", + Workflows: []config.WorkflowRef{ + {File: filepath.Join(dir, "a.yaml")}, + {File: filepath.Join(dir, "b.yaml")}, + }, + }, + ConfigDir: dir, + }) + if err == nil { + t.Fatal("expected error for pipeline name conflict") + } + if !strings.Contains(err.Error(), "pipeline name conflict") { + t.Fatalf("expected 'pipeline name conflict' in error, got: %v", err) + } +} + + func TestBuildFromApplicationConfig_MultipleWorkflows_MergesPipelines(t *testing.T) { dir := t.TempDir() diff --git a/module/pipeline_step_workflow_call.go b/module/pipeline_step_workflow_call.go index 373b6efc..e0c4d3fe 100644 --- a/module/pipeline_step_workflow_call.go +++ b/module/pipeline_step_workflow_call.go @@ -123,12 +123,13 @@ func (s *WorkflowCallStep) Execute(ctx context.Context, pc *PipelineContext) (*S } if s.mode == WorkflowCallModeAsync { - // Fire-and-forget: run in background goroutine with its own timeout - go func() { - asyncCtx, cancel := context.WithTimeout(context.Background(), s.timeout) + // Fire-and-forget: run in background goroutine derived from parent context + // so cancellation signals propagate, bounded by the configured timeout. + go func(parentCtx context.Context, data map[string]any) { + asyncCtx, cancel := context.WithTimeout(parentCtx, s.timeout) defer cancel() - _, _ = target.Execute(asyncCtx, triggerData) //nolint:errcheck - }() + _, _ = target.Execute(asyncCtx, data) //nolint:errcheck + }(ctx, triggerData) return &StepResult{Output: map[string]any{"workflow": s.workflow, "mode": "async", "dispatched": true}}, nil }