diff --git a/cmd/server/main.go b/cmd/server/main.go index 5f79ad01..ed5a66e1 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -423,6 +423,9 @@ func initManagementHandlers(logger *slog.Logger, engine *workflow.StdEngine, cfg mgmtHandler.SetReloadFunc(func(newCfg *config.WorkflowConfig) error { return app.reloadEngine(newCfg) }) + mgmtHandler.SetTryActivateFunc(func(newCfg *config.WorkflowConfig) (*module.TryActivateResult, error) { + return app.tryActivateEngine(newCfg) + }) mgmtHandler.SetStatusFunc(func() map[string]any { return map[string]any{"status": "running"} }) @@ -1051,37 +1054,67 @@ func (app *serverApp) registerPostStartServices(logger *slog.Logger) error { return nil } -// reloadEngine stops the current engine, builds a new one from the given config, -// starts it, and re-registers all persistent services with the new Application. -// This preserves all stores, handlers, and database connections across reloads. +// reloadEngine implements a safe try-activate reload: +// 1. Build candidate engine from newCfg (no ports bound, current engine stays live). +// 2. Stop current engine only after the candidate has been built successfully. +// 3. Start candidate engine; on failure rebuild from the previous config and +// restart it (rollback). +// +// Stores, handlers, and database connections stored on serverApp survive +// every reload cycle. func (app *serverApp) reloadEngine(newCfg *config.WorkflowConfig) error { logger := app.logger - // Stop the current engine - if stopErr := app.engine.Stop(context.Background()); stopErr != nil { - logger.Warn("Error stopping engine during reload", "error", stopErr) - } - - // Build and start a new engine + // Stage 1: Build candidate. Current engine is still live; if this fails + // the old engine continues serving without interruption. newEngine, _, _, buildErr := buildEngine(newCfg, logger) if buildErr != nil { - return fmt.Errorf("failed to rebuild engine: %w", buildErr) + return fmt.Errorf("failed to build candidate engine (current engine unchanged): %w", buildErr) } - // Update the serverApp reference BEFORE registering services, - // since registerManagementServices reads app.engine. - app.engine = newEngine + // Stage 2: Stop the current engine now that a viable candidate exists. + oldEngine := app.engine + oldConfig := app.currentConfig + if stopErr := oldEngine.Stop(context.Background()); stopErr != nil { + logger.Warn("Error stopping engine during reload", "error", stopErr) + } - // Re-register pre-start management services with the new Application + // Update references before registering services (registerManagementServices + // reads app.engine to reach the Application registry). + app.engine = newEngine + app.currentConfig = newCfg registerManagementServices(logger, app) - // Start the new engine + // Stage 3: Activate candidate. if startErr := newEngine.Start(context.Background()); startErr != nil { - return fmt.Errorf("failed to start reloaded engine: %w", startErr) + logger.Error("Candidate engine failed to start; attempting rollback to previous config", "error", startErr) + if stopErr := newEngine.Stop(context.Background()); stopErr != nil { + logger.Warn("Failed to stop candidate engine after failed reload start", "error", stopErr) + } + + // Rollback: rebuild from previous config and restart. + rollbackEngine, _, _, rollbackBuildErr := buildEngine(oldConfig, logger) + if rollbackBuildErr != nil { + app.currentConfig = oldConfig // keep old config pointer for diagnostics + return fmt.Errorf("reload failed AND rollback build failed — process is degraded: candidate=%w, rollback=%v", startErr, rollbackBuildErr) + } + app.engine = rollbackEngine + app.currentConfig = oldConfig + registerManagementServices(logger, app) + if rollbackStartErr := rollbackEngine.Start(context.Background()); rollbackStartErr != nil { + return fmt.Errorf("reload failed AND rollback start failed — process is degraded: candidate=%w, rollback=%v", startErr, rollbackStartErr) + } + if app.stores.v1Store != nil { + if regErr := app.registerPostStartServices(logger); regErr != nil { + logger.Warn("Failed to re-register post-start services during rollback", "error", regErr) + } + } + logger.Info("Engine reload rolled back to previous config") + return fmt.Errorf("reload failed (rolled back to previous config): %w", startErr) } - // Re-register post-start services (stores already initialized, just need - // to be re-registered with the new Application's service registry). + // Stage 4: Re-register post-start services with the new Application's + // service registry (stores are already initialized, just need re-wiring). if app.stores.v1Store != nil { if regErr := app.registerPostStartServices(logger); regErr != nil { return fmt.Errorf("failed to re-register post-start services: %w", regErr) @@ -1092,6 +1125,29 @@ func (app *serverApp) reloadEngine(newCfg *config.WorkflowConfig) error { return nil } +// tryActivateEngine builds a candidate engine from cfg without stopping the +// current engine or swapping any active pointers. It is a probe-only operation +// that returns a structured result describing what the candidate would expose. +// If the build fails, the current engine is completely unaffected. +func (app *serverApp) tryActivateEngine(cfg *config.WorkflowConfig) (*module.TryActivateResult, error) { + candidateEngine, _, _, buildErr := buildEngine(cfg, app.logger) + if buildErr != nil { + return &module.TryActivateResult{ + Status: "build_failed", + Error: buildErr.Error(), + }, buildErr + } + + // Collect registered module/step/trigger type names from the candidate. + result := &module.TryActivateResult{ + Status: "build_ok", + ModuleTypes: candidateEngine.RegisteredModuleTypes(), + StepTypes: candidateEngine.RegisteredStepTypes(), + TriggerTypes: candidateEngine.RegisteredTriggerTypes(), + } + return result, nil +} + // importBundles imports and deploys workflow bundles specified via --import-bundle. func (app *serverApp) importBundles(logger *slog.Logger) error { if *importBundle == "" { diff --git a/cmd/server/main_test.go b/cmd/server/main_test.go index c013cd9f..79fcd764 100644 --- a/cmd/server/main_test.go +++ b/cmd/server/main_test.go @@ -5,10 +5,12 @@ import ( "context" "encoding/json" "log/slog" + "net" "net/http" "net/http/httptest" "os" "path/filepath" + "strings" "testing" "time" @@ -917,3 +919,279 @@ func TestEnvOverride_ImportBundle(t *testing.T) { t.Errorf("importBundle = %q, want %q", *importBundle, "/some/bundle.tar.gz") } } + +// TestReloadEngine_BuildFailureKeepsPriorEngineActive verifies that when the +// candidate config cannot be built (e.g. unknown module type), reloadEngine +// returns an error and the serverApp's engine pointer is left unchanged. +// This proves the safe try-activate contract: the current engine stays live +// for the entire build phase and is never stopped on build failure. +func TestReloadEngine_BuildFailureKeepsPriorEngineActive(t *testing.T) { + t.Setenv("ANTHROPIC_API_KEY", "") + t.Setenv("JWT_SECRET", "test-secret-that-is-at-least-32-bytes-long") + *anthropicKey = "" + *copilotCLI = "" + + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + + // Start with a valid minimal config. + initialCfg := config.NewEmptyWorkflowConfig() + app, err := setup(logger, initialCfg) + if err != nil { + t.Fatalf("setup failed: %v", err) + } + originalEngine := app.engine + + // Attempt a reload with an invalid config (unknown module type). + badCfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "bad", Type: "nonexistent.module.type.for.test"}, + }, + Workflows: map[string]any{}, + Triggers: map[string]any{}, + } + + reloadErr := app.reloadEngine(badCfg) + if reloadErr == nil { + t.Fatal("expected reloadEngine to return an error for invalid module type") + } + + // The engine reference must be unchanged — the old engine was not stopped. + if app.engine != originalEngine { + t.Error("reloadEngine build failure replaced the active engine; expected it to be unchanged") + } +} + +// TestReloadEngine_SuccessReplacesEngine verifies that a successful reload +// replaces the engine pointer and updates currentConfig. +func TestReloadEngine_SuccessReplacesEngine(t *testing.T) { + t.Setenv("ANTHROPIC_API_KEY", "") + t.Setenv("JWT_SECRET", "test-secret-that-is-at-least-32-bytes-long") + *anthropicKey = "" + *copilotCLI = "" + + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + initialCfg := config.NewEmptyWorkflowConfig() + + app, err := setup(logger, initialCfg) + if err != nil { + t.Fatalf("setup failed: %v", err) + } + originalEngine := app.engine + + // Reload with another valid empty config. + newCfg := config.NewEmptyWorkflowConfig() + if err := app.reloadEngine(newCfg); err != nil { + t.Fatalf("reloadEngine with valid config failed: %v", err) + } + + if app.engine == originalEngine { + t.Error("expected reloadEngine to replace the engine pointer") + } + if app.engine == nil { + t.Error("expected non-nil engine after successful reload") + } +} + +// TestReloadEngine_StartFailureRollsBackToPriorConfig verifies that a candidate +// engine start failure restores the prior config and active engine. +func TestReloadEngine_StartFailureRollsBackToPriorConfig(t *testing.T) { + t.Setenv("ANTHROPIC_API_KEY", "") + t.Setenv("JWT_SECRET", "test-secret-that-is-at-least-32-bytes-long") + *anthropicKey = "" + *copilotCLI = "" + + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + initialCfg := config.NewEmptyWorkflowConfig() + + app, err := setup(logger, initialCfg) + if err != nil { + t.Fatalf("setup failed: %v", err) + } + originalConfig := app.currentConfig + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen on test port: %v", err) + } + t.Cleanup(func() { _ = listener.Close() }) + + conflictingCfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + { + Name: "conflicting-server", + Type: "http.server", + Config: map[string]any{"address": listener.Addr().String()}, + }, + }, + Workflows: map[string]any{}, + Triggers: map[string]any{}, + } + + reloadErr := app.reloadEngine(conflictingCfg) + if reloadErr == nil { + t.Fatal("expected reloadEngine to fail when candidate cannot bind its port") + } + if app.currentConfig != originalConfig { + t.Error("expected rollback to restore previous config pointer") + } + if app.engine == nil { + t.Fatal("expected rollback to leave an active engine") + } +} + +func TestReloadEngine_StartFailureReportsRollbackBuildFailure(t *testing.T) { + t.Setenv("ANTHROPIC_API_KEY", "") + t.Setenv("JWT_SECRET", "test-secret-that-is-at-least-32-bytes-long") + *anthropicKey = "" + *copilotCLI = "" + + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + app, err := setup(logger, config.NewEmptyWorkflowConfig()) + if err != nil { + t.Fatalf("setup failed: %v", err) + } + + app.currentConfig = &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "bad", Type: "nonexistent.module.type.for.rollback"}, + }, + Workflows: map[string]any{}, + Triggers: map[string]any{}, + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen on test port: %v", err) + } + t.Cleanup(func() { _ = listener.Close() }) + + reloadErr := app.reloadEngine(configWithHTTPServerOn(listener.Addr().String())) + if reloadErr == nil { + t.Fatal("expected reloadEngine to report rollback build failure") + } + if !strings.Contains(reloadErr.Error(), "rollback build failed") { + t.Fatalf("expected rollback build failure, got %v", reloadErr) + } +} + +func TestReloadEngine_StartFailureReportsRollbackStartFailure(t *testing.T) { + t.Setenv("ANTHROPIC_API_KEY", "") + t.Setenv("JWT_SECRET", "test-secret-that-is-at-least-32-bytes-long") + *anthropicKey = "" + *copilotCLI = "" + + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + app, err := setup(logger, config.NewEmptyWorkflowConfig()) + if err != nil { + t.Fatalf("setup failed: %v", err) + } + + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("listen on test port: %v", err) + } + t.Cleanup(func() { _ = listener.Close() }) + conflictingCfg := configWithHTTPServerOn(listener.Addr().String()) + app.currentConfig = conflictingCfg + + reloadErr := app.reloadEngine(conflictingCfg) + if reloadErr == nil { + t.Fatal("expected reloadEngine to report rollback start failure") + } + if !strings.Contains(reloadErr.Error(), "rollback start failed") { + t.Fatalf("expected rollback start failure, got %v", reloadErr) + } +} + +func configWithHTTPServerOn(address string) *config.WorkflowConfig { + return &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + { + Name: "conflicting-server", + Type: "http.server", + Config: map[string]any{"address": address}, + }, + }, + Workflows: map[string]any{}, + Triggers: map[string]any{}, + } +} + +// TestTryActivateEngine_ValidConfigReturnsBuildOK verifies that tryActivateEngine +// returns a "build_ok" result for a valid config without touching the active engine. +func TestTryActivateEngine_ValidConfigReturnsBuildOK(t *testing.T) { + t.Setenv("ANTHROPIC_API_KEY", "") + t.Setenv("JWT_SECRET", "test-secret-that-is-at-least-32-bytes-long") + *anthropicKey = "" + *copilotCLI = "" + + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + initialCfg := config.NewEmptyWorkflowConfig() + + app, err := setup(logger, initialCfg) + if err != nil { + t.Fatalf("setup failed: %v", err) + } + originalEngine := app.engine + + candidateCfg := config.NewEmptyWorkflowConfig() + result, err := app.tryActivateEngine(candidateCfg) + if err != nil { + t.Fatalf("tryActivateEngine failed unexpectedly: %v", err) + } + if result == nil { + t.Fatal("expected non-nil TryActivateResult") + } + if result.Status != "build_ok" { + t.Errorf("expected status %q, got %q", "build_ok", result.Status) + } + // Active engine pointer must be unchanged. + if app.engine != originalEngine { + t.Error("tryActivateEngine must not replace the active engine pointer") + } +} + +// TestTryActivateEngine_InvalidConfigReturnsBuildFailed verifies that +// tryActivateEngine returns a non-nil error and a "build_failed" result +// for a config with an unknown module type. +func TestTryActivateEngine_InvalidConfigReturnsBuildFailed(t *testing.T) { + t.Setenv("ANTHROPIC_API_KEY", "") + t.Setenv("JWT_SECRET", "test-secret-that-is-at-least-32-bytes-long") + *anthropicKey = "" + *copilotCLI = "" + + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + initialCfg := config.NewEmptyWorkflowConfig() + + app, err := setup(logger, initialCfg) + if err != nil { + t.Fatalf("setup failed: %v", err) + } + originalEngine := app.engine + + badCfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "bad", Type: "nonexistent.module.type.for.test"}, + }, + Workflows: map[string]any{}, + Triggers: map[string]any{}, + } + + result, buildErr := app.tryActivateEngine(badCfg) + if buildErr == nil { + t.Fatal("expected tryActivateEngine to return error for invalid module type") + } + if result == nil { + t.Fatal("expected non-nil TryActivateResult even on failure") + } + if result.Status != "build_failed" { + t.Errorf("expected status %q, got %q", "build_failed", result.Status) + } + if result.Error == "" { + t.Error("expected non-empty error field in TryActivateResult") + } + // Active engine pointer must be unchanged. + if app.engine != originalEngine { + t.Error("tryActivateEngine failure must not replace the active engine pointer") + } +} diff --git a/docs/APPLICATION_LIFECYCLE.md b/docs/APPLICATION_LIFECYCLE.md index c2488eda..06f52414 100644 --- a/docs/APPLICATION_LIFECYCLE.md +++ b/docs/APPLICATION_LIFECYCLE.md @@ -139,42 +139,85 @@ changes only when adding new module types. ### What Happens When You Change the YAML? -The current system supports two reload mechanisms: +The current system supports three reload mechanisms: -#### 1. Full Engine Restart (Config Changes) +#### 1. Safe Try-Activate Reload (Default — Config Changes) -When the YAML config is updated (via the UI handler or file change): +When the YAML config is updated (via the admin API, V1 API, or file watcher), +the engine uses a **try-activate, health-probe, rollback** sequence: ``` Time ──────────────────────────────────────────────────> - Old Engine Running New Engine Running - ├── handling requests ──┤ ├── handling requests ──> - │ │ - stop │ │ start - │ │ - ┌────┴───┴────┐ - │ Reload │ - │ (200-500ms)│ - └─────────────┘ + Old Engine Running (unchanged during build) New Engine + ├── handling requests ──────────────────┤ ├── handling requests ──> + │ │ + Stage 1 │ │ Stage 3: start + buildEngine │ │ + (no stop) │ │ + │ │ + Stage 2 │ │ + stop │ │ + │ │ + ┌──────┴─────┴──────┐ + │ Safe Reload Cycle │ + │ (200-500ms total) │ + └────────────────────┘ ``` -1. `engine.Stop()` -- stops all triggers, waits for module shutdown -2. `buildEngine(newConfig)` -- creates fresh engine from new YAML -3. `engine.Start()` -- starts all modules and triggers +**Stages:** -**Impact on in-flight requests:** -- Requests currently being processed will fail (connection reset) -- No graceful draining -- this is a hard stop/start -- Typical reload time: 200-500ms for the e-commerce example (24 modules) +1. **Build candidate** — `buildEngine(newConfig)` constructs a fresh engine from + the candidate YAML. The current engine is completely untouched at this point. + If the build fails (unknown module type, invalid config, etc.) the error is + returned immediately and the current engine keeps serving. -**What this means for production:** -- Brief downtime during config updates -- Behind a load balancer, you can do rolling updates (stop instance A, update, - start A, stop instance B, update, start B) -- For zero-downtime, use the multi-instance model with rolling deploys +2. **Stop current** — `engine.Stop()` stops all triggers and modules of the old + engine. Only reached when the candidate was built successfully. -#### 2. Dynamic Component Hot-Reload (No Downtime) +3. **Activate candidate** — `engine.Start()` starts all modules and triggers of + the new engine. On failure, the engine is rebuilt from the previous config and + restarted (**rollback**). + +**Rollback contract:** a build failure at Stage 1 guarantees the current engine +is untouched. A start failure at Stage 3 triggers an automatic rollback to the +previous config, restoring service. + +**What this means for operators:** +- Malformed YAML or unknown module types are caught before any disruption. +- Brief downtime still occurs between Stage 2 and Stage 3. +- Use the `try-activate` probe endpoint for zero-impact pre-flight validation. + +#### 2. Legacy External Plugin Reload (Deprecated semantics — pre-v1.0) + +> **Do not use this pattern in new code.** It is documented here only for +> historical reference. The current `ReloadPlugin` API (below) already uses the +> safe try-activate contract, not this legacy path. + +**Legacy (unsafe) unload-then-load sequence:** + +``` +1. Kill old subprocess +2. Start new subprocess ← failure here leaves the slot empty +``` + +If Step 2 fails, no plugin is registered and the slot is dark. This was the +original behaviour before the safe try-activate contract was introduced. + +**Safe (current) ReloadPlugin sequence:** + +``` +1. Start candidate subprocess and perform handshake/contract validation + ← failure here: kill candidate, keep old process registered +2. Kill old subprocess only after candidate is validated +3. Register candidate as the new active plugin +``` + +The `/api/v1/plugins/external/{name}/reload` API endpoint uses this safe +contract. See [Plugin Try-Activate Probe](#plugin-try-activate-probe) below +for the equivalent probe-only path. + +#### 3. Dynamic Component Hot-Reload (No Downtime) Dynamic components (`.go` files loaded via Yaegi) support true hot-reload: @@ -254,6 +297,50 @@ Developer edits components/payment_processor.go --- +### Plugin Try-Activate Probe + +Before committing a plugin or config reload, callers can issue a dry-run probe +that builds the candidate without swapping any active pointers: + +**Config try-activate:** + +```bash +curl -X POST http://localhost:8081/api/v1/admin/engine/try-activate \ + -H "Content-Type: application/json" \ + --data-binary @candidate-workflow.json +``` + +Response: +```json +{ + "status": "build_ok", + "moduleTypes": ["http.server", "http.router", "messaging.kafka"], + "stepTypes": ["step.set", "step.http_call"], + "triggerTypes": ["http", "schedule"] +} +``` + +On failure: +```json +{ + "status": "build_failed", + "error": "module type \"nonexistent.type\" not found" +} +``` + +The probe: +- builds a full candidate engine in isolation +- returns the module/step/trigger types the candidate would expose +- **never touches the running engine** — no stop, no swap +- is suitable for pre-flight checks in automated update campaigns + +**Plugin try-activate:** the `/api/v1/plugins/external/{name}/reload` endpoint +already implements try-activate semantics: the candidate subprocess is started +and validated before the old process is killed. A failure returns an error and +leaves the current plugin process registered. + +--- + ## Scaling for High Load ### Current State: What's In-Memory diff --git a/docs/DEPLOYMENT_GUIDE.md b/docs/DEPLOYMENT_GUIDE.md index d35eeda1..7f3dc5d7 100644 --- a/docs/DEPLOYMENT_GUIDE.md +++ b/docs/DEPLOYMENT_GUIDE.md @@ -239,7 +239,48 @@ curl -X POST http://localhost:8081/api/v1/admin/engine/reload \ --data-binary @updated-workflow.yaml ``` -The reload stops the current engine, builds and starts a new one, and rolls back on failure. In-flight requests may be dropped -- use Kubernetes rolling deployments for zero-downtime updates in production. +The reload uses a **safe try-activate sequence**: + +1. Build candidate engine from the new config (current engine stays live). +2. On build failure → return error, current engine untouched. +3. Stop current engine only after candidate is built successfully. +4. Start candidate engine. +5. On start failure → rebuild from previous config and restart (rollback). + +In-flight requests may be dropped during the stop/start window. For +zero-downtime updates use Kubernetes rolling deployments. + +### Config Try-Activate Probe (Dry Run) + +Validate that a candidate config can be built without touching the running engine: + +```bash +curl -X POST http://localhost:8081/api/v1/admin/engine/try-activate \ + -H "Content-Type: application/json" \ + --data-binary @candidate-workflow.json +``` + +Response on success: +```json +{ + "status": "build_ok", + "moduleTypes": ["http.server", "http.router"], + "stepTypes": ["step.set", "step.http_call"], + "triggerTypes": ["http"] +} +``` + +Response on failure: +```json +{ + "status": "build_failed", + "error": "module type \"nonexistent.type\" not found" +} +``` + +The probe is safe to call against a live server — it never stops or swaps the +active engine. Use it from automated update managers to gate a rollout before +issuing a full reload. ### Example Configurations diff --git a/docs/PLUGIN_DEVELOPMENT_GUIDE.md b/docs/PLUGIN_DEVELOPMENT_GUIDE.md index 13497abb..b221cab3 100644 --- a/docs/PLUGIN_DEVELOPMENT_GUIDE.md +++ b/docs/PLUGIN_DEVELOPMENT_GUIDE.md @@ -559,11 +559,22 @@ Unload a running plugin (graceful shutdown of the subprocess): curl -X POST http://localhost:8081/api/v1/plugins/external/my-plugin/unload ``` -Reload a plugin (unload + load, useful after updating the binary): +Reload a plugin using the safe try-activate contract (candidate subprocess is +validated before the active process is killed): ```bash curl -X POST http://localhost:8081/api/v1/plugins/external/my-plugin/reload ``` +> **Legacy vs. safe reload semantics** +> +> *Legacy (unsafe, pre-v1.0):* kill old → start new. If the new start fails, the +> slot is empty and the plugin surface is dark. +> +> *Safe (current):* start candidate → validate handshake/contract → kill old only +> after candidate is healthy. If the candidate fails to start or validate, the old +> process stays registered and serving. The `/reload` endpoint uses the safe +> contract exclusively. + ## API Endpoints All external plugin management endpoints are under the `/api/v1/plugins/external` prefix. diff --git a/engine.go b/engine.go index 0998cda1..2ff42f4d 100644 --- a/engine.go +++ b/engine.go @@ -6,6 +6,7 @@ import ( "fmt" "log/slog" "path/filepath" + "sort" "strings" "time" @@ -225,6 +226,39 @@ func (e *StdEngine) GetStepRegistry() interfaces.StepRegistrar { return e.stepRegistry } +// RegisteredModuleTypes returns the sorted list of module type names registered +// with this engine (via AddModuleType or LoadPlugin). +func (e *StdEngine) RegisteredModuleTypes() []string { + types := make([]string, 0, len(e.moduleFactories)) + for t := range e.moduleFactories { + types = append(types, t) + } + sort.Strings(types) + return types +} + +// RegisteredStepTypes returns the sorted list of pipeline step type names +// registered with this engine (via AddStepType or LoadPlugin). +func (e *StdEngine) RegisteredStepTypes() []string { + if e.stepRegistry == nil { + return nil + } + types := e.stepRegistry.Types() + sort.Strings(types) + return types +} + +// RegisteredTriggerTypes returns the sorted list of trigger type keys registered +// with this engine (via RegisterTriggerType or LoadPlugin). +func (e *StdEngine) RegisteredTriggerTypes() []string { + types := make([]string, 0, len(e.triggerTypeMap)) + for t := range e.triggerTypeMap { + types = append(types, t) + } + sort.Strings(types) + return types +} + // PluginLoader returns the engine's plugin loader, creating it lazily if needed. func (e *StdEngine) PluginLoader() *plugin.PluginLoader { if e.pluginLoader == nil { diff --git a/engine_test.go b/engine_test.go index 6efaed48..09663a63 100644 --- a/engine_test.go +++ b/engine_test.go @@ -1739,3 +1739,79 @@ func TestEngine_BuildFromConfig_InvalidTemplateRefsMode(t *testing.T) { t.Errorf("unexpected error message: %v", err) } } + +func TestStdEngine_RegisteredModuleTypes(t *testing.T) { + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + + // Add a custom type and verify it appears in the result. + engine.AddModuleType("test.custom.module", func(name string, cfg map[string]any) modular.Module { + return nil + }) + + types := engine.RegisteredModuleTypes() + found := false + for _, tp := range types { + if tp == "test.custom.module" { + found = true + break + } + } + if !found { + t.Errorf("RegisteredModuleTypes did not include %q; got %v", "test.custom.module", types) + } + + // Result must be sorted. + for i := 1; i < len(types); i++ { + if types[i] < types[i-1] { + t.Errorf("RegisteredModuleTypes is not sorted: %v", types) + break + } + } +} + +func TestStdEngine_RegisteredStepTypes(t *testing.T) { + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + loadAllPlugins(t, engine) + + types := engine.RegisteredStepTypes() + if len(types) == 0 { + t.Fatal("expected non-empty step types after loading plugins") + } + + // Result must be sorted. + for i := 1; i < len(types); i++ { + if types[i] < types[i-1] { + t.Errorf("RegisteredStepTypes is not sorted: %v", types) + break + } + } +} + +func TestStdEngine_RegisteredStepTypes_NilRegistry(t *testing.T) { + engine := &StdEngine{} + if types := engine.RegisteredStepTypes(); types != nil { + t.Fatalf("expected nil step types for nil registry, got %v", types) + } +} + +func TestStdEngine_RegisteredTriggerTypes(t *testing.T) { + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + loadAllPlugins(t, engine) + + types := engine.RegisteredTriggerTypes() + // With plugins loaded there should be at least one trigger type (e.g. "http"). + if len(types) == 0 { + t.Fatal("expected non-empty trigger types after loading plugins") + } + + // Result must be sorted. + for i := 1; i < len(types); i++ { + if types[i] < types[i-1] { + t.Errorf("RegisteredTriggerTypes is not sorted: %v", types) + break + } + } +} diff --git a/module/api_workflow_ui.go b/module/api_workflow_ui.go index 5b2a4170..60092dd4 100644 --- a/module/api_workflow_ui.go +++ b/module/api_workflow_ui.go @@ -11,6 +11,18 @@ import ( "gopkg.in/yaml.v3" ) +// TryActivateResult is the structured response returned by the try-activate +// endpoint. It reports whether the candidate config can be built and what +// module/step/trigger types the resulting engine would expose. +type TryActivateResult struct { + // Status is "build_ok" on success, "build_failed" on failure. + Status string `json:"status"` + Error string `json:"error,omitempty"` + ModuleTypes []string `json:"moduleTypes,omitempty"` + StepTypes []string `json:"stepTypes,omitempty"` + TriggerTypes []string `json:"triggerTypes,omitempty"` +} + // ServiceInfo describes a registered service for API responses. type ServiceInfo struct { Name string `json:"name"` @@ -21,11 +33,12 @@ type ServiceInfo struct { // WorkflowUIHandler serves the workflow editor UI and provides API endpoints // for managing workflow configurations. type WorkflowUIHandler struct { - mu sync.RWMutex - config *config.WorkflowConfig - reloadFn func(*config.WorkflowConfig) error - engineStatus func() map[string]any - svcRegistry func() map[string]any + mu sync.RWMutex + config *config.WorkflowConfig + reloadFn func(*config.WorkflowConfig) error + tryActivateFn func(*config.WorkflowConfig) (*TryActivateResult, error) + engineStatus func() map[string]any + svcRegistry func() map[string]any } // NewWorkflowUIHandler creates a new handler with an optional initial config. @@ -41,6 +54,14 @@ func (h *WorkflowUIHandler) SetReloadFunc(fn func(*config.WorkflowConfig) error) h.reloadFn = fn } +// SetTryActivateFunc sets the callback for try-activate (build without deploy). +// The callback should build a candidate engine from the given config and return +// a TryActivateResult describing what the candidate would expose. It must not +// stop the current engine or swap any active pointers. +func (h *WorkflowUIHandler) SetTryActivateFunc(fn func(*config.WorkflowConfig) (*TryActivateResult, error)) { + h.tryActivateFn = fn +} + // SetStatusFunc sets the callback for getting engine status. func (h *WorkflowUIHandler) SetStatusFunc(fn func() map[string]any) { h.engineStatus = fn @@ -59,6 +80,7 @@ func (h *WorkflowUIHandler) RegisterRoutes(mux *http.ServeMux) { mux.HandleFunc("GET /api/workflow/services", h.handleGetServices) mux.HandleFunc("POST /api/workflow/validate", h.handleValidate) mux.HandleFunc("POST /api/workflow/reload", h.handleReload) + mux.HandleFunc("POST /api/workflow/try-activate", h.handleTryActivate) mux.HandleFunc("GET /api/workflow/status", h.handleStatus) } @@ -232,6 +254,8 @@ func (h *WorkflowUIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.handleValidate(w, r) case "reload": h.handleReload(w, r) + case "try-activate": + h.handleTryActivate(w, r) default: w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusNotFound) @@ -276,6 +300,11 @@ func (h *WorkflowUIHandler) HandleReload(w http.ResponseWriter, r *http.Request) h.handleReload(w, r) } +// HandleTryActivate probes a candidate config (POST /engine/try-activate). +func (h *WorkflowUIHandler) HandleTryActivate(w http.ResponseWriter, r *http.Request) { + h.handleTryActivate(w, r) +} + // HandleStatus returns the engine status (GET /engine/status). func (h *WorkflowUIHandler) HandleStatus(w http.ResponseWriter, r *http.Request) { h.handleStatus(w, r) @@ -431,3 +460,40 @@ func (h *WorkflowUIHandler) handleValidate(w http.ResponseWriter, r *http.Reques http.Error(w, "failed to encode result", http.StatusInternalServerError) } } + +// handleTryActivate builds a candidate engine from the request body without +// stopping the current engine. It is a probe-only operation: no active pointer +// is swapped and the current engine continues serving. The JSON response +// includes the build status and the module/step/trigger types the candidate +// would expose, giving operators and agent update managers enough context to +// decide whether to commit a full reload. +// +// Request body: JSON-encoded WorkflowConfig. +// Response: TryActivateResult JSON. +func (h *WorkflowUIHandler) handleTryActivate(w http.ResponseWriter, r *http.Request) { + if h.tryActivateFn == nil { + http.Error(w, "try-activate not configured", http.StatusServiceUnavailable) + return + } + + var cfg config.WorkflowConfig + if err := json.NewDecoder(r.Body).Decode(&cfg); err != nil { + http.Error(w, fmt.Sprintf("invalid JSON: %v", err), http.StatusBadRequest) + return + } + + result, err := h.tryActivateFn(&cfg) + if result == nil && err != nil { + result = &TryActivateResult{ + Status: "build_failed", + Error: err.Error(), + } + } + w.Header().Set("Content-Type", "application/json") + if err != nil { + w.WriteHeader(http.StatusUnprocessableEntity) + } + if encErr := json.NewEncoder(w).Encode(result); encErr != nil { + http.Error(w, "failed to encode response", http.StatusInternalServerError) + } +} diff --git a/module/api_workflow_ui_test.go b/module/api_workflow_ui_test.go index 81d50b87..07a4b1a2 100644 --- a/module/api_workflow_ui_test.go +++ b/module/api_workflow_ui_test.go @@ -619,3 +619,160 @@ func TestWorkflowUIHandler_HandleManagement_WithHTTPHandler(t *testing.T) { t.Errorf("unexpected config: %+v", result) } } + +// TestWorkflowUIHandler_TryActivate_NotConfigured verifies that the handler +// returns 503 when no tryActivateFn is set. +func TestWorkflowUIHandler_TryActivate_NotConfigured(t *testing.T) { + h := NewWorkflowUIHandler(nil) + mux := http.NewServeMux() + h.RegisterRoutes(mux) + + body, _ := json.Marshal(&config.WorkflowConfig{}) + req := httptest.NewRequest(http.MethodPost, "/api/workflow/try-activate", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + + if w.Code != http.StatusServiceUnavailable { + t.Errorf("expected 503, got %d", w.Code) + } +} + +// TestWorkflowUIHandler_TryActivate_Success verifies that when tryActivateFn +// succeeds the handler returns 200 with a build_ok result. +func TestWorkflowUIHandler_TryActivate_Success(t *testing.T) { + h := NewWorkflowUIHandler(nil) + h.SetTryActivateFunc(func(cfg *config.WorkflowConfig) (*TryActivateResult, error) { + return &TryActivateResult{ + Status: "build_ok", + ModuleTypes: []string{"http.server"}, + StepTypes: []string{"step.set"}, + }, nil + }) + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + + body, _ := json.Marshal(&config.WorkflowConfig{}) + req := httptest.NewRequest(http.MethodPost, "/api/workflow/try-activate", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var result TryActivateResult + if err := json.NewDecoder(w.Body).Decode(&result); err != nil { + t.Fatalf("decode: %v", err) + } + if result.Status != "build_ok" { + t.Errorf("expected status %q, got %q", "build_ok", result.Status) + } + if len(result.ModuleTypes) == 0 { + t.Error("expected non-empty moduleTypes in response") + } +} + +// TestWorkflowUIHandler_TryActivate_Failure verifies that when tryActivateFn +// returns an error the handler returns 422 with a build_failed result. +func TestWorkflowUIHandler_TryActivate_Failure(t *testing.T) { + h := NewWorkflowUIHandler(nil) + h.SetTryActivateFunc(func(cfg *config.WorkflowConfig) (*TryActivateResult, error) { + return &TryActivateResult{ + Status: "build_failed", + Error: "unknown module type", + }, fmt.Errorf("unknown module type") + }) + + mux := http.NewServeMux() + h.RegisterRoutes(mux) + + body, _ := json.Marshal(&config.WorkflowConfig{}) + req := httptest.NewRequest(http.MethodPost, "/api/workflow/try-activate", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + mux.ServeHTTP(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Fatalf("expected 422, got %d: %s", w.Code, w.Body.String()) + } + + var result TryActivateResult + if err := json.NewDecoder(w.Body).Decode(&result); err != nil { + t.Fatalf("decode: %v", err) + } + if result.Status != "build_failed" { + t.Errorf("expected status %q, got %q", "build_failed", result.Status) + } + if result.Error == "" { + t.Error("expected non-empty error in response") + } +} + +func TestWorkflowUIHandler_TryActivate_NilResultWithError(t *testing.T) { + h := NewWorkflowUIHandler(nil) + h.SetTryActivateFunc(func(cfg *config.WorkflowConfig) (*TryActivateResult, error) { + return nil, fmt.Errorf("probe failed") + }) + + body, _ := json.Marshal(&config.WorkflowConfig{}) + req := httptest.NewRequest(http.MethodPost, "/api/workflow/try-activate", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + h.HandleTryActivate(w, req) + + if w.Code != http.StatusUnprocessableEntity { + t.Fatalf("expected 422, got %d: %s", w.Code, w.Body.String()) + } + + var result TryActivateResult + if err := json.NewDecoder(w.Body).Decode(&result); err != nil { + t.Fatalf("decode: %v", err) + } + if result.Status != "build_failed" { + t.Errorf("expected status %q, got %q", "build_failed", result.Status) + } + if !strings.Contains(result.Error, "probe failed") { + t.Errorf("expected error to contain probe failure, got %q", result.Error) + } +} + +func TestWorkflowUIHandler_TryActivate_InvalidJSON(t *testing.T) { + h := NewWorkflowUIHandler(nil) + h.SetTryActivateFunc(func(cfg *config.WorkflowConfig) (*TryActivateResult, error) { + t.Fatal("tryActivateFn should not be called for invalid JSON") + return nil, nil + }) + + req := httptest.NewRequest(http.MethodPost, "/api/workflow/try-activate", strings.NewReader("{")) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + h.HandleTryActivate(w, req) + + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String()) + } +} + +// TestWorkflowUIHandler_TryActivate_ServeHTTP verifies the ServeHTTP dispatch +// includes the try-activate path. +func TestWorkflowUIHandler_TryActivate_ServeHTTP(t *testing.T) { + h := NewWorkflowUIHandler(nil) + called := false + h.SetTryActivateFunc(func(cfg *config.WorkflowConfig) (*TryActivateResult, error) { + called = true + return &TryActivateResult{Status: "build_ok"}, nil + }) + + body, _ := json.Marshal(&config.WorkflowConfig{}) + req := httptest.NewRequest(http.MethodPost, "/api/v1/admin/engine/try-activate", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + w := httptest.NewRecorder() + h.ServeHTTP(w, req) + + if !called { + t.Error("expected tryActivateFn to be called via ServeHTTP") + } +} diff --git a/module/openapi_admin_schemas.go b/module/openapi_admin_schemas.go index 77363965..da4cba32 100644 --- a/module/openapi_admin_schemas.go +++ b/module/openapi_admin_schemas.go @@ -116,6 +116,17 @@ func registerComponentSchemas(gen *OpenAPIGenerator) { }, }) + gen.RegisterComponentSchema("TryActivateResult", &OpenAPISchema{ + Type: "object", + Properties: map[string]*OpenAPISchema{ + "status": {Type: "string", Enum: []string{"build_ok", "build_failed"}}, + "error": {Type: "string"}, + "moduleTypes": {Type: "array", Items: &OpenAPISchema{Type: "string"}}, + "stepTypes": {Type: "array", Items: &OpenAPISchema{Type: "string"}}, + "triggerTypes": {Type: "array", Items: &OpenAPISchema{Type: "string"}}, + }, + }) + gen.RegisterComponentSchema("ModuleTypeInfo", &OpenAPISchema{ Type: "object", Properties: map[string]*OpenAPISchema{ @@ -573,6 +584,7 @@ func registerEngineOperationSchemas(gen *OpenAPIGenerator) { gen.SetOperationSchema("GET", "/api/v1/admin/engine/modules", nil, SchemaArray(SchemaRef("ModuleTypeInfo"))) gen.SetOperationSchema("POST", "/api/v1/admin/engine/validate", SchemaRef("WorkflowConfig"), SchemaRef("ValidationResult")) gen.SetOperationSchema("POST", "/api/v1/admin/engine/reload", nil, SchemaRef("SuccessResponse")) + gen.SetOperationSchema("POST", "/api/v1/admin/engine/try-activate", SchemaRef("WorkflowConfig"), SchemaRef("TryActivateResult")) } func registerSchemaOperationSchemas(gen *OpenAPIGenerator) { diff --git a/module/openapi_generator_test.go b/module/openapi_generator_test.go index 4c194969..0b386eea 100644 --- a/module/openapi_generator_test.go +++ b/module/openapi_generator_test.go @@ -704,6 +704,7 @@ func TestRegisterAdminSchemas_Integration(t *testing.T) { {Method: "GET", Path: "/api/v1/auth/setup-status", Handler: "auth"}, {Method: "POST", Path: "/api/v1/auth/login", Handler: "auth"}, {Method: "GET", Path: "/api/v1/admin/engine/config", Handler: "engine"}, + {Method: "POST", Path: "/api/v1/admin/engine/try-activate", Handler: "engine"}, {Method: "GET", Path: "/api/v1/admin/companies", Handler: "v1"}, {Method: "POST", Path: "/api/v1/admin/companies", Handler: "v1"}, {Method: "GET", Path: "/api/v1/admin/workflows", Handler: "v1"}, @@ -727,7 +728,7 @@ func TestRegisterAdminSchemas_Integration(t *testing.T) { // Spot-check a few key schemas expectedSchemas := []string{ "LoginRequest", "AuthResponse", "UserProfile", "SetupStatusResponse", - "WorkflowConfig", "EngineStatus", "ModuleSchema", + "WorkflowConfig", "EngineStatus", "TryActivateResult", "ModuleSchema", "AIGenerateRequest", "AIGenerateResponse", "Company", "Organization", "Project", "Workflow", "Execution", "ComponentInfo", "IAMProvider", "DashboardData", "AuditEntry", @@ -764,6 +765,20 @@ func TestRegisterAdminSchemas_Integration(t *testing.T) { t.Errorf("expected $ref to AuthResponse, got %q", loginResp.Ref) } + // POST /api/v1/admin/engine/try-activate should reference TryActivateResult. + tryActivate := spec.Paths["/api/v1/admin/engine/try-activate"] + if tryActivate == nil || tryActivate.Post == nil { + t.Fatal("expected try-activate path") + } + tryActivateReq := tryActivate.Post.RequestBody.Content["application/json"].Schema + if tryActivateReq.Ref != "#/components/schemas/WorkflowConfig" { + t.Errorf("expected $ref to WorkflowConfig, got %q", tryActivateReq.Ref) + } + tryActivateResp := tryActivate.Post.Responses["200"].Content["application/json"].Schema + if tryActivateResp.Ref != "#/components/schemas/TryActivateResult" { + t.Errorf("expected $ref to TryActivateResult, got %q", tryActivateResp.Ref) + } + // GET /api/v1/admin/companies should return array of Company companies := spec.Paths["/api/v1/admin/companies"] if companies == nil || companies.Get == nil {