Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 136 additions & 14 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,33 @@

// loadConfig loads a workflow configuration from the configured file path,
// or returns an empty config if no path is set.
func loadConfig(logger *slog.Logger) (*config.WorkflowConfig, error) {
// If the config file contains an application-level config (multi-workflow),
// the returned WorkflowConfig will be nil and the ApplicationConfig will be set.
func loadConfig(logger *slog.Logger) (*config.WorkflowConfig, *config.ApplicationConfig, error) {
if *configFile != "" {
// Peek at the file to detect whether it is an application config.
data, err := os.ReadFile(*configFile)
if err != nil {
return nil, nil, fmt.Errorf("failed to read configuration file: %w", err)
}

if config.IsApplicationConfig(data) {
logger.Info("Detected multi-workflow application config", "file", *configFile)
appCfg, err := config.LoadApplicationConfig(*configFile)
if err != nil {
return nil, nil, fmt.Errorf("failed to load application configuration: %w", err)
}
return nil, appCfg, nil
}

cfg, err := config.LoadFromFile(*configFile)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %w", err)
return nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}
return cfg, nil
return cfg, nil, nil
}
logger.Info("No config file specified, using empty workflow config")
return config.NewEmptyWorkflowConfig(), nil
return config.NewEmptyWorkflowConfig(), nil, nil
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -360,6 +377,111 @@
return app, nil
}

// setupFromAppConfig initializes all server components from a multi-workflow
// application config. It merges all workflow files into a combined WorkflowConfig,
// applies the admin config overlay, then builds the engine using
// BuildFromApplicationConfig so cross-workflow pipeline calls are wired up.
Comment on lines +382 to +383
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says the engine is built using BuildFromApplicationConfig, but the implementation merges configs and then calls buildEngine(combined, ...) (which uses BuildFromConfig). Either update the comment to match the actual flow, or switch to calling engine.BuildFromApplicationConfig(appCfg) if that’s the intended entry point so the code and documentation stay aligned.

Suggested change
// applies the admin config overlay, then builds the engine using
// BuildFromApplicationConfig so cross-workflow pipeline calls are wired up.
// applies the admin config overlay, then builds the engine from the merged
// configuration so cross-workflow pipeline calls are wired up.

Copilot uses AI. Check for mistakes.
func setupFromAppConfig(logger *slog.Logger, appCfg *config.ApplicationConfig) (*serverApp, error) {
// Merge all workflow files into a combined config so the admin overlay
// can be applied consistently (module names, route configs, etc.).
combined, err := config.MergeApplicationConfig(appCfg)
if err != nil {
return nil, fmt.Errorf("failed to merge application config: %w", err)
}

// Apply admin config overlay (admin UI, management routes, etc.).
if err := mergeAdminConfig(logger, combined); err != nil {
return nil, fmt.Errorf("failed to set up admin: %w", err)
}

// Build the engine from the already-merged application config (including the
// admin overlay). The merged config is passed directly to buildEngine, which
// internally uses BuildFromConfig and ensures features like the pipeline
// registry for step.workflow_call are configured correctly.
engine, loader, registry, err := buildEngine(combined, logger)
if err != nil {
return nil, fmt.Errorf("failed to build engine: %w", err)
}

sApp := &serverApp{
engine: engine,
logger: logger,
}

pool := dynamic.NewInterpreterPool()
aiSvc, deploySvc := initAIService(logger, registry, pool)
initManagementHandlers(logger, engine, combined, sApp, aiSvc, deploySvc, loader, registry)
registerManagementServices(logger, sApp)

sApp.postStartFuncs = append(sApp.postStartFuncs, func() error {
if err := sApp.initStores(logger); err != nil {
return err
}
return sApp.registerPostStartServices(logger)
}, func() error {
return sApp.importBundles(logger)
})

sApp.mgmt.auditLogger = audit.NewLogger(os.Stdout)
sApp.mgmt.auditLogger.LogConfigChange(context.Background(), "system", "server",
"server started with application config: "+appCfg.Application.Name)

return sApp, nil
}

// mergeAdminConfig loads the embedded admin config and merges admin
// modules/routes into the primary config. If --admin-ui-dir (or ADMIN_UI_DIR
// env var) is set the static.fileserver root is updated to that path,
// allowing the admin UI to be deployed and updated independently of the binary.
// If the config already contains admin modules (e.g., the user passed the
// admin config directly), the merge is skipped to avoid duplicates — but
// the UI root is still injected so the static fileserver works.
func mergeAdminConfig(logger *slog.Logger, cfg *config.WorkflowConfig) error {
// Resolve the UI root: flag > ADMIN_UI_DIR env > leave as configured in config.yaml
uiDir := *adminUIDir

// Check if the config already contains admin modules
for _, m := range cfg.Modules {
if m.Name == "admin-server" {
logger.Info("Config already contains admin modules, skipping merge")
if uiDir != "" {
injectUIRoot(cfg, uiDir)
logger.Info("Admin UI root overridden", "uiDir", uiDir)
}
return nil
}
}

adminCfg, err := admin.LoadConfig()
if err != nil {
return err
}

if uiDir != "" {
injectUIRoot(adminCfg, uiDir)
logger.Info("Admin UI root overridden", "uiDir", uiDir)
}

// Merge admin modules and routes into primary config
admin.MergeInto(cfg, adminCfg)

logger.Info("Admin UI enabled")
return nil
}

// injectUIRoot updates every static.fileserver module config in cfg to serve
// from the given root directory.
func injectUIRoot(cfg *config.WorkflowConfig, uiRoot string) {
for i := range cfg.Modules {
if cfg.Modules[i].Type == "static.fileserver" {
if cfg.Modules[i].Config == nil {
cfg.Modules[i].Config = make(map[string]any)
}
cfg.Modules[i].Config["root"] = uiRoot
}
}
}

// initManagementHandlers creates all management service handlers and stores
// them on the serverApp struct. These handlers are created once and persist
// across engine reloads. Only the service registrations need to be refreshed.
Expand Down Expand Up @@ -1389,20 +1511,20 @@
pgStore.Close()
}()

// Block until a termination signal is received, then let deferred cleanup run.
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("Multi-workflow API on %s\n", *multiWorkflowAddr)
<-sigCh
fmt.Println("Shutting down multi-workflow mode...")
return
}
cfg, err := loadConfig(logger)
// Load configuration — supports both single-workflow and multi-workflow application configs.
cfg, appCfg, err := loadConfig(logger)
if err != nil {
log.Fatalf("Configuration error: %v", err) //nolint:gocritic // exitAfterDefer: intentional, cleanup is best-effort
}

app, err := setup(logger, cfg)
var app *serverApp
if appCfg != nil {
// Multi-workflow application config: build engine from application config
app, err = setupFromAppConfig(logger, appCfg)
} else {
// Single-workflow config (backward-compatible)
app, err = setup(logger, cfg)
}
if err != nil {
log.Fatalf("Setup error: %v", err) //nolint:gocritic // exitAfterDefer: intentional, cleanup is best-effort
}
Expand Down Expand Up @@ -1432,7 +1554,7 @@
// runMultiWorkflow implements multi-workflow mode: connects to PostgreSQL,
// runs migrations, creates an engine manager, mounts the REST API, and
// optionally seeds an initial workflow from -config.
func runMultiWorkflow(logger *slog.Logger) error {

Check failure on line 1557 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Build

syntax error: unexpected name runMultiWorkflow, expected (
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -1607,7 +1729,7 @@

// bootstrapAdmin creates an admin user if one doesn't already exist.
// It returns the admin user's UUID so callers can associate resources with them.
func bootstrapAdmin(ctx context.Context, users evstore.UserStore, email, password string, logger *slog.Logger) (uuid.UUID, error) {

Check failure on line 1732 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Build

syntax error: unexpected name bootstrapAdmin, expected (
existing, err := users.GetByEmail(ctx, email)
if err != nil && !errors.Is(err, evstore.ErrNotFound) {
return uuid.Nil, fmt.Errorf("check existing admin: %w", err)
Expand Down Expand Up @@ -1641,7 +1763,7 @@
// slugify converts a string into a URL-friendly slug: lowercase, ASCII alphanumeric
// characters and hyphens only, with consecutive hyphens collapsed and leading/trailing
// hyphens trimmed.
func slugify(s string) string {

Check failure on line 1766 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Build

syntax error: unexpected name slugify, expected (
var b strings.Builder
for _, r := range strings.ToLower(s) {
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '-' {
Expand All @@ -1659,7 +1781,7 @@

// ensureSystemProject finds or creates the "system" company and "default" project
// used to associate seed workflows with the required database entities.
func ensureSystemProject(ctx context.Context, pg *evstore.PGStore, ownerID uuid.UUID) (*evstore.Project, error) {

Check failure on line 1784 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Build

syntax error: unexpected name ensureSystemProject, expected (
const companySlug = "system"
const projectSlug = "default"

Expand Down Expand Up @@ -1698,7 +1820,7 @@
}

// seedWorkflow imports a YAML config as the initial workflow into the database.
func seedWorkflow(ctx context.Context, pg *evstore.PGStore, configPath string, adminUserID uuid.UUID, logger *slog.Logger) error {

Check failure on line 1823 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Build

syntax error: unexpected name seedWorkflow, expected (
// Validate the config is loadable
if _, err := config.LoadFromFile(configPath); err != nil {
return fmt.Errorf("load config file: %w", err)
Expand Down Expand Up @@ -1752,7 +1874,7 @@
return nil
}

func initAIService(logger *slog.Logger, registry *dynamic.ComponentRegistry, pool *dynamic.InterpreterPool) (*ai.Service, *ai.DeployService) {

Check failure on line 1877 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Build

syntax error: unexpected name initAIService, expected (
svc := ai.NewService()

// Anthropic provider
Expand Down Expand Up @@ -1800,7 +1922,7 @@
// This is used as a stub for delegate services whose backing stores failed to
// initialize, preventing the delegate step from returning a hard 500 error
// ("service not found in registry").
func featureDisabledHandler(reason string) http.Handler {

Check failure on line 1925 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Build

syntax error: unexpected name featureDisabledHandler, expected (
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
Expand Down
12 changes: 9 additions & 3 deletions cmd/server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,16 @@ func TestLoadConfig_NoFile(t *testing.T) {
*configFile = ""
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

cfg, err := loadConfig(logger)
cfg, appCfg, err := loadConfig(logger)
if err != nil {
t.Fatalf("loadConfig failed: %v", err)
}
if cfg == nil {
t.Fatal("expected non-nil config")
}
if appCfg != nil {
t.Fatal("expected nil application config for empty configFile")
}
if len(cfg.Modules) != 0 {
t.Errorf("expected empty modules, got %d", len(cfg.Modules))
}
Expand All @@ -346,7 +349,7 @@ func TestLoadConfig_InvalidFile(t *testing.T) {
defer func() { *configFile = "" }()
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

_, err := loadConfig(logger)
_, _, err := loadConfig(logger)
if err == nil {
t.Fatal("expected error for nonexistent config file")
}
Expand Down Expand Up @@ -377,10 +380,13 @@ triggers: {}
defer func() { *configFile = "" }()
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))

cfg, err := loadConfig(logger)
cfg, appCfg, err := loadConfig(logger)
if err != nil {
t.Fatalf("loadConfig failed: %v", err)
}
if appCfg != nil {
t.Fatal("expected nil application config for single-workflow YAML")
}
if len(cfg.Modules) != 1 {
t.Fatalf("expected 1 module, got %d", len(cfg.Modules))
}
Expand Down
143 changes: 143 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,67 @@ import (
"gopkg.in/yaml.v3"
)

// WorkflowRef is a reference to a workflow config file within an application config.
type WorkflowRef struct {
// File is the path to the workflow YAML config file (relative to the application config).
File string `json:"file" yaml:"file"`
// Name is an optional override for the workflow's name within the application namespace.
// If empty, the filename stem (without extension) is used.
Name string `json:"name,omitempty" yaml:"name,omitempty"`
}

// ApplicationInfo holds top-level metadata about a multi-workflow application.
type ApplicationInfo struct {
// Name is the application name.
Name string `json:"name" yaml:"name"`
// Workflows lists the workflow config files that make up this application.
Workflows []WorkflowRef `json:"workflows" yaml:"workflows"`
}

// ApplicationConfig is the top-level config for a multi-workflow application.
// It references multiple workflow config files that share a module registry.
type ApplicationConfig struct {
// Application holds the application-level metadata and workflow references.
Application ApplicationInfo `json:"application" yaml:"application"`
// ConfigDir is the directory of the application config file, used for resolving relative paths.
ConfigDir string `json:"-" yaml:"-"`
}

// LoadApplicationConfig loads an application config from a YAML file.
func LoadApplicationConfig(filepath string) (*ApplicationConfig, error) {
data, err := os.ReadFile(filepath)
if err != nil {
return nil, fmt.Errorf("failed to read application config file: %w", err)
}

var cfg ApplicationConfig
if err := yaml.Unmarshal(data, &cfg); err != nil {
return nil, fmt.Errorf("failed to parse application config file: %w", err)
}

// Store the config file's directory for relative path resolution
absPath, err := pathpkg.Abs(filepath)
if err == nil {
cfg.ConfigDir = pathpkg.Dir(absPath)
}

return &cfg, nil
}

// IsApplicationConfig returns true if the YAML data contains an application-level config
// (i.e., has an "application" key with a "workflows" section).
func IsApplicationConfig(data []byte) bool {
var probe struct {
Application *struct {
Workflows []any `yaml:"workflows"`
} `yaml:"application"`
}
if err := yaml.Unmarshal(data, &probe); err != nil {
return false
}
return probe.Application != nil && len(probe.Application.Workflows) > 0
}

// ModuleConfig represents a single module configuration
type ModuleConfig struct {
Name string `json:"name" yaml:"name"`
Expand Down Expand Up @@ -98,6 +159,88 @@ func ResolvePathInConfig(cfg map[string]any, path string) string {
return path
}

// MergeApplicationConfig loads all workflow config files referenced by an
// ApplicationConfig and merges them into a single WorkflowConfig. This is
// useful for callers that need a single combined config (e.g., the server's
// admin merge step) before passing it to the engine.
//
// Module name conflicts across files are reported as errors.
func MergeApplicationConfig(appCfg *ApplicationConfig) (*WorkflowConfig, error) {
if appCfg == nil {
return nil, fmt.Errorf("application config is nil")
}

combined := NewEmptyWorkflowConfig()
combined.ConfigDir = appCfg.ConfigDir
seenModules := make(map[string]string)
seenTriggers := make(map[string]string)
seenPipelines := make(map[string]string)

for _, ref := range appCfg.Application.Workflows {
if ref.File == "" {
return nil, fmt.Errorf("application %q: workflow reference has no 'file' field", appCfg.Application.Name)
}

filePath := ref.File
if !pathpkg.IsAbs(filePath) && appCfg.ConfigDir != "" {
filePath = pathpkg.Join(appCfg.ConfigDir, filePath)
}

wfCfg, err := LoadFromFile(filePath)
if err != nil {
return nil, fmt.Errorf("application %q: failed to load workflow file %q: %w", appCfg.Application.Name, ref.File, err)
}

// Derive a name for error messages
wfName := ref.Name
if wfName == "" {
base := pathpkg.Base(filePath)
wfName = base[:len(base)-len(pathpkg.Ext(base))]
}

for _, modCfg := range wfCfg.Modules {
if existing, conflict := seenModules[modCfg.Name]; conflict {
return nil, fmt.Errorf("application %q: module name conflict: module %q is defined in both %q and %q",
appCfg.Application.Name, modCfg.Name, existing, wfName)
}
seenModules[modCfg.Name] = wfName
}

for k := range wfCfg.Triggers {
if existing, conflict := seenTriggers[k]; conflict {
return nil, fmt.Errorf("application %q: trigger name conflict: trigger %q is defined in both %q and %q",
appCfg.Application.Name, k, existing, wfName)
}
seenTriggers[k] = wfName
}
for k := range wfCfg.Pipelines {
if existing, conflict := seenPipelines[k]; conflict {
return nil, fmt.Errorf("application %q: pipeline name conflict: pipeline %q is defined in both %q and %q",
appCfg.Application.Name, k, existing, wfName)
}
seenPipelines[k] = wfName
}

combined.Modules = append(combined.Modules, wfCfg.Modules...)
for k, v := range wfCfg.Workflows {
combined.Workflows[k] = v
}
for k, v := range wfCfg.Triggers {
combined.Triggers[k] = v
Comment on lines +225 to +229
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trigger name conflicts are not detected when merging multiple workflow files. Similar to the pipeline name conflict issue, if two workflow files define triggers with the same name, the second will silently overwrite the first (line 211 uses simple map assignment). Consider adding conflict detection for both triggers and workflows maps to match the module conflict detection on lines 198-204.

Copilot uses AI. Check for mistakes.
}
for k, v := range wfCfg.Pipelines {
combined.Pipelines[k] = v
Comment on lines +231 to +232
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline name conflicts are not detected when merging multiple workflow files. If two workflow files define pipelines with the same name, the second will silently overwrite the first (line 214 uses simple map assignment). This could lead to unexpected behavior where one workflow's pipeline is inaccessible. Consider adding conflict detection similar to the module name conflict check on lines 198-204.

Copilot uses AI. Check for mistakes.
}
// Fall back to first workflow file's directory if application config
// directory was not set.
if combined.ConfigDir == "" {
combined.ConfigDir = wfCfg.ConfigDir
}
Comment on lines +236 to +238
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfigDir assignment uses only the first workflow file's directory (lines 216-218). When merging multiple workflow files from different directories, this may cause issues if module configs reference relative paths - only paths relative to the first workflow's directory will resolve correctly. Consider either: (1) using the application config's directory consistently, or (2) preserving each module's original config directory in a per-module field, or (3) documenting that all relative paths must be relative to the first workflow file's directory.

Copilot uses AI. Check for mistakes.
}

return combined, nil
}

// NewEmptyWorkflowConfig creates a new empty workflow configuration
func NewEmptyWorkflowConfig() *WorkflowConfig {
return &WorkflowConfig{
Expand Down
Loading
Loading