Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 74 additions & 18 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ func initManagementHandlers(logger *slog.Logger, engine *workflow.StdEngine, cfg
mgmtHandler.SetReloadFunc(func(newCfg *config.WorkflowConfig) error {
return app.reloadEngine(newCfg)
})
mgmtHandler.SetTryActivateFunc(func(newCfg *config.WorkflowConfig) (*module.TryActivateResult, error) {
return app.tryActivateEngine(newCfg)
})
mgmtHandler.SetStatusFunc(func() map[string]any {
return map[string]any{"status": "running"}
})
Expand Down Expand Up @@ -1051,37 +1054,67 @@ func (app *serverApp) registerPostStartServices(logger *slog.Logger) error {
return nil
}

// reloadEngine stops the current engine, builds a new one from the given config,
// starts it, and re-registers all persistent services with the new Application.
// This preserves all stores, handlers, and database connections across reloads.
// reloadEngine implements a safe try-activate reload:
// 1. Build candidate engine from newCfg (no ports bound, current engine stays live).
// 2. Stop current engine only after the candidate has been built successfully.
// 3. Start candidate engine; on failure rebuild from the previous config and
// restart it (rollback).
//
// Stores, handlers, and database connections stored on serverApp survive
// every reload cycle.
func (app *serverApp) reloadEngine(newCfg *config.WorkflowConfig) error {
logger := app.logger

// Stop the current engine
if stopErr := app.engine.Stop(context.Background()); stopErr != nil {
logger.Warn("Error stopping engine during reload", "error", stopErr)
}

// Build and start a new engine
// Stage 1: Build candidate. Current engine is still live; if this fails
// the old engine continues serving without interruption.
newEngine, _, _, buildErr := buildEngine(newCfg, logger)
if buildErr != nil {
return fmt.Errorf("failed to rebuild engine: %w", buildErr)
return fmt.Errorf("failed to build candidate engine (current engine unchanged): %w", buildErr)
}

// Update the serverApp reference BEFORE registering services,
// since registerManagementServices reads app.engine.
app.engine = newEngine
// Stage 2: Stop the current engine now that a viable candidate exists.
oldEngine := app.engine
oldConfig := app.currentConfig
if stopErr := oldEngine.Stop(context.Background()); stopErr != nil {
logger.Warn("Error stopping engine during reload", "error", stopErr)
}

// Re-register pre-start management services with the new Application
// Update references before registering services (registerManagementServices
// reads app.engine to reach the Application registry).
app.engine = newEngine
app.currentConfig = newCfg
registerManagementServices(logger, app)

// Start the new engine
// Stage 3: Activate candidate.
if startErr := newEngine.Start(context.Background()); startErr != nil {
return fmt.Errorf("failed to start reloaded engine: %w", startErr)
logger.Error("Candidate engine failed to start; attempting rollback to previous config", "error", startErr)
if stopErr := newEngine.Stop(context.Background()); stopErr != nil {
logger.Warn("Failed to stop candidate engine after failed reload start", "error", stopErr)
}

// Rollback: rebuild from previous config and restart.
rollbackEngine, _, _, rollbackBuildErr := buildEngine(oldConfig, logger)
if rollbackBuildErr != nil {
app.currentConfig = oldConfig // keep old config pointer for diagnostics
return fmt.Errorf("reload failed AND rollback build failed — process is degraded: candidate=%w, rollback=%v", startErr, rollbackBuildErr)
}
app.engine = rollbackEngine
app.currentConfig = oldConfig
registerManagementServices(logger, app)
if rollbackStartErr := rollbackEngine.Start(context.Background()); rollbackStartErr != nil {
return fmt.Errorf("reload failed AND rollback start failed — process is degraded: candidate=%w, rollback=%v", startErr, rollbackStartErr)
}
Comment thread
intel352 marked this conversation as resolved.
if app.stores.v1Store != nil {
if regErr := app.registerPostStartServices(logger); regErr != nil {
logger.Warn("Failed to re-register post-start services during rollback", "error", regErr)
}
}
logger.Info("Engine reload rolled back to previous config")
return fmt.Errorf("reload failed (rolled back to previous config): %w", startErr)
}

// Re-register post-start services (stores already initialized, just need
// to be re-registered with the new Application's service registry).
// Stage 4: Re-register post-start services with the new Application's
// service registry (stores are already initialized, just need re-wiring).
if app.stores.v1Store != nil {
if regErr := app.registerPostStartServices(logger); regErr != nil {
return fmt.Errorf("failed to re-register post-start services: %w", regErr)
Expand All @@ -1092,6 +1125,29 @@ func (app *serverApp) reloadEngine(newCfg *config.WorkflowConfig) error {
return nil
}

// tryActivateEngine builds a candidate engine from cfg without stopping the
// current engine or swapping any active pointers. It is a probe-only operation
// that returns a structured result describing what the candidate would expose.
// If the build fails, the current engine is completely unaffected.
func (app *serverApp) tryActivateEngine(cfg *config.WorkflowConfig) (*module.TryActivateResult, error) {
candidateEngine, _, _, buildErr := buildEngine(cfg, app.logger)
if buildErr != nil {
return &module.TryActivateResult{
Status: "build_failed",
Error: buildErr.Error(),
}, buildErr
}

// Collect registered module/step/trigger type names from the candidate.
result := &module.TryActivateResult{
Status: "build_ok",
ModuleTypes: candidateEngine.RegisteredModuleTypes(),
StepTypes: candidateEngine.RegisteredStepTypes(),
TriggerTypes: candidateEngine.RegisteredTriggerTypes(),
}
return result, nil
}

// importBundles imports and deploys workflow bundles specified via --import-bundle.
func (app *serverApp) importBundles(logger *slog.Logger) error {
if *importBundle == "" {
Expand Down
Loading
Loading