From 33759d07c4f0337601578ab6225d83409fb616c0 Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Fri, 22 Dec 2023 11:28:16 +0100 Subject: [PATCH 01/11] TASK: add another assertion to testcases to ensure they ran successfully --- prunner_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/prunner_test.go b/prunner_test.go index cb1d1e1..ac6c245 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -161,6 +161,7 @@ func TestPipelineRunner_ScheduleAsync_WithEmptyScriptTask(t *testing.T) { require.NoError(t, err) waitForCompletedJob(t, pRunner, job.ID) + assert.NoError(t, job.LastError) } func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) { @@ -213,6 +214,7 @@ func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) { require.NoError(t, err) waitForCompletedJob(t, pRunner, job.ID) + assert.NoError(t, job.LastError) pipelineVarTaskOutput := store.GetBytes(job.ID.String(), "pipeline_var", "stdout") assert.Equal(t, "from pipeline,from pipeline,from process", string(pipelineVarTaskOutput), "output of pipeline_var") @@ -325,6 +327,7 @@ func TestPipelineRunner_CancelJob_WithQueuedJob(t *testing.T) { waitForCompletedJob(t, pRunner, job1.ID) waitForCanceledJob(t, pRunner, job2.ID) waitForCompletedJob(t, pRunner, job3.ID) + assert.NoError(t, job1.LastError) assert.Nil(t, job2.Start, "job 2 should not be started") assert.Equal(t, true, job2.Tasks.ByName("sleep").Canceled, "job 2 task was marked as canceled") @@ -426,6 +429,7 @@ func TestPipelineRunner_FirstErroredTaskShouldCancelAllRunningTasks_ByDefault(t jobID := job.ID waitForCompletedJob(t, pRunner, jobID) + assert.Error(t, job.LastError) assert.True(t, job.Tasks.ByName("err").Errored, "err task was errored") assert.True(t, job.Tasks.ByName("ok").Canceled, "ok task should be cancelled") From 0b5f67c4b5ecd955cecede98bb6bd4e9c4cdaee4 Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Fri, 22 Dec 2023 13:23:51 +0100 Subject: [PATCH 02/11] FEATURE: allow specifying on_error handler in pipeline this on_error handler is executed if any other step in the pipeline failed; and can be e.g. used to trigger a notification on error. --- definition/pipelines.go | 18 +++++ prunner.go | 146 +++++++++++++++++++++++++++++++++++----- prunner_test.go | 131 +++++++++++++++++++++++++++++++++++ 3 files changed, 278 insertions(+), 17 deletions(-) diff --git a/definition/pipelines.go b/definition/pipelines.go index 3b08204..60c9ca8 100644 --- a/definition/pipelines.go +++ b/definition/pipelines.go @@ -40,6 +40,15 @@ func (d TaskDef) Equals(otherDef TaskDef) bool { return true } +// OnErrorTaskDef is a special task definition to be executed solely if an error occurs during "normal" task handling. +type OnErrorTaskDef struct { + // Script is a list of shell commands that are executed if an error occurs in a "normal" task + Script []string `yaml:"script"` + + // Env sets/overrides environment variables for this task (takes precedence over pipeline environment) + Env map[string]string `yaml:"env"` +} + type PipelineDef struct { // Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1) Concurrency int `yaml:"concurrency"` @@ -62,6 +71,15 @@ type PipelineDef struct { Tasks map[string]TaskDef `yaml:"tasks"` + // Script to be executed if this pipeline fails, e.g. for notifications. + // In this script, you have the following variables set: + // - failedTaskName: Name of the failed task (key from pipelines.yml) + // - failedTaskExitCode: Exit code of the failed task + // - failedTaskError: Error message of the failed task + // - failedTaskStdout: Stdout of the failed task + // - failedTaskStderr: Stderr of the failed task + OnError OnErrorTaskDef `yaml:"onError"` + // SourcePath stores the source path where the pipeline was defined SourcePath string } diff --git a/prunner.go b/prunner.go index 1ee864d..966b5af 100644 --- a/prunner.go +++ b/prunner.go @@ -3,6 +3,7 @@ package prunner import ( "context" "fmt" + "io" "sort" "sync" "time" @@ -418,6 +419,39 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) { if jt == nil { return } + updateJobTaskStateFromTask(jt, t) + + // if the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is set to FALSE), + // then we directly abort all other tasks of the job. + // NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only + // if one task failed, and we want to kill the other tasks. + if jt.Errored { + pipelineDef, found := r.defs.Pipelines[j.Pipeline] + if found && !pipelineDef.ContinueRunningTasksAfterFailure { + log. + WithField("component", "runner"). + WithField("jobID", jobIDString). + WithField("pipeline", j.Pipeline). + WithField("failedTaskName", t.Name). + Debug("Task failed - cancelling all other tasks of the job") + // Use internal cancel since we already have a lock on the mutex + _ = r.cancelJobInternal(jobID) + } + + if found && len(pipelineDef.OnError.Script) > 0 { + // we errored; and there is an onError script defined for the + // current pipeline. So let's run it. + r.runOnErrorScript(t, j, pipelineDef.OnError) + } + } + + r.requestPersist() +} + +// updateJobTaskStateFromTask updates jobTask properties from a given taskCtl task.Task. +// Very internal helper function, to be used in PipelineRunner.HandleTaskChange +// and PipelineRunner.runOnErrorScript. +func updateJobTaskStateFromTask(jt *jobTask, t *task.Task) { if !t.Start.IsZero() { start := t.Start jt.Start = &start @@ -437,25 +471,103 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) { jt.Error = t.Error } - // if the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is set to FALSE), - // then we directly abort all other tasks of the job. - // NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only - // if one task failed, and we want to kill the other tasks. - if jt.Errored { - pipelineDef, found := r.defs.Pipelines[j.Pipeline] - if found && !pipelineDef.ContinueRunningTasksAfterFailure { - log. - WithField("component", "runner"). - WithField("jobID", jobIDString). - WithField("pipeline", j.Pipeline). - WithField("failedTaskName", t.Name). - Debug("Task failed - cancelling all other tasks of the job") - // Use internal cancel since we already have a lock on the mutex - _ = r.cancelJobInternal(jobID) - } +} + +const OnErrorTaskName = "on_error" + +// runOnErrorScript is responsible for running a special "on_error" script in response to an error in the pipeline. +// It exposes variables containing information about the errored task. +// +// The method is triggered with the errored Task t, belonging to pipelineJob j; and pipelineDev +func (r *PipelineRunner) runOnErrorScript(t *task.Task, j *PipelineJob, onErrorTaskDef definition.OnErrorTaskDef) { + log. + WithField("component", "runner"). + WithField("jobID", j.ID.String()). + WithField("pipeline", j.Pipeline). + WithField("failedTaskName", t.Name). + Debug("Triggering onError Script because of task failure") + + rc, _ := r.outputStore.Reader(j.ID.String(), t.Name, "stdout") + defer func(rc io.ReadCloser) { + _ = rc.Close() + }(rc) + failedTaskStdout, err := io.ReadAll(rc) + + rc, _ = r.outputStore.Reader(j.ID.String(), t.Name, "stderr") + defer func(rc io.ReadCloser) { + _ = rc.Close() + }(rc) + failedTaskStderr, err := io.ReadAll(rc) + + onErrorVariables := make(map[string]interface{}) + for key, value := range j.Variables { + onErrorVariables[key] = value + } + onErrorVariables["failedTaskName"] = t.Name + onErrorVariables["failedTaskExitCode"] = t.ExitCode + onErrorVariables["failedTaskError"] = t.Error + onErrorVariables["failedTaskStdout"] = string(failedTaskStdout) + onErrorVariables["failedTaskStderr"] = string(failedTaskStderr) + + onErrorJobTask := jobTask{ + TaskDef: definition.TaskDef{ + Script: onErrorTaskDef.Script, + // AllowFailure needs to be FALSE; otherwise lastError below won't be filled (so errors will not appear in the log) + AllowFailure: false, + Env: onErrorTaskDef.Env, + }, + Name: OnErrorTaskName, + Status: toStatus(scheduler.StatusWaiting), } - r.requestPersist() + // store on task list; so that it appears in pipeline and UI etc + j.Tasks = append(j.Tasks, onErrorJobTask) + + onErrorGraph, err := buildPipelineGraph(j.ID, jobTasks{onErrorJobTask}, onErrorVariables) + if err != nil { + log. + WithError(err). + WithField("jobID", j.ID). + WithField("pipeline", j.Pipeline). + Error("Failed to build onError pipeline graph") + onErrorJobTask.Error = err + onErrorJobTask.Errored = true + + // the last element in the task list is our onErrorJobTask; but because it is not a pointer we need to + // store it again after modifying it. + j.Tasks[len(j.Tasks)-1] = onErrorJobTask + return + } + + // we use a detached taskRunner and scheduler to run the onError task, to + // run synchronously (as we are already in an async goroutine here), won't have any cycles, + // and to simplify the code. + taskRunner := r.createTaskRunner(j) + sched := taskctl.NewScheduler(taskRunner) + + // Now, actually run the job synchronously + lastErr := sched.Schedule(onErrorGraph) + + // Update job status as with normal jobs + onErrorJobTask.Status = toStatus(onErrorGraph.Nodes()[OnErrorTaskName].ReadStatus()) + updateJobTaskStateFromTask(&onErrorJobTask, onErrorGraph.Nodes()[OnErrorTaskName].Task) + + if lastErr != nil { + log. + WithError(err). + WithField("jobID", j.ID). + WithField("pipeline", j.Pipeline). + Error("Error running the onError handler") + } else { + log. + WithField("jobID", j.ID). + WithField("pipeline", j.Pipeline). + Debug("Successfully ran the onError handler") + } + + // the last element in the task list is our onErrorJobTask; but because it is not a pointer we need to + // store it again after modifying it. + j.Tasks[len(j.Tasks)-1] = onErrorJobTask } // HandleStageChange will be called when the stage state changes in the scheduler diff --git a/prunner_test.go b/prunner_test.go index ac6c245..1a33ed7 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -223,6 +223,137 @@ func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) { assert.Equal(t, "from task,from pipeline,from process", string(taskVarTaskOutput), "output of task_var") } +func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "erroring_script": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "a": { + Script: []string{"echo A"}, + }, + "b": { + Script: []string{ + "echo stdoutContent", + "echo This message goes to stderr >&2", + "exit 42", + }, + }, + "wait": { + DependsOn: []string{"a", "b"}, + }, + }, + OnError: definition.OnErrorTaskDef{ + Script: []string{ + "echo ON_ERROR", + "echo 'Failed Task Name: {{ .failedTaskName }}'", + "echo 'Failed Task Exit Code: {{ .failedTaskExitCode }}'", + "echo 'Failed Task Error: {{ .failedTaskError }}'", + "echo 'Failed Task Stdout: {{ .failedTaskStdout }}'", + "echo 'Failed Task Stderr: {{ .failedTaskStderr }}'", + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockOutputStore := test.NewMockOutputStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + // Use a real runner here to test the actual processing of a task.Task + taskRunner, _ := taskctl.NewTaskRunner(mockOutputStore) + return taskRunner + }, nil, mockOutputStore) + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("erroring_script", ScheduleOpts{}) + require.NoError(t, err) + + waitForCompletedJob(t, pRunner, job.ID) + res := mockOutputStore.GetBytes(job.ID.String(), "on_error", "stdout") + assert.Error(t, job.LastError) + assert.Equal(t, `ON_ERROR +Failed Task Name: b +Failed Task Exit Code: 42 +Failed Task Error: exit status 42 +Failed Task Stdout: stdoutContent + +Failed Task Stderr: This message goes to stderr + +`, string(res)) + + jt := job.Tasks.ByName("on_error") + if assert.NotNil(t, jt) { + assert.False(t, jt.Canceled, "onError task was not marked as canceled") + assert.False(t, jt.Errored, "task was not marked as errored") + assert.Equal(t, "done", jt.Status, "task has status done") + assert.Nil(t, jt.Error, "task has no error set") + } +} + +func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook_AndSetsStateCorrectlyIfErrorHookFails(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "erroring_script": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "a": { + Script: []string{"echo A"}, + }, + "b": { + Script: []string{ + "echo stdoutContent", + "echo This message goes to stderr >&2", + "exit 42", + }, + }, + "wait": { + DependsOn: []string{"a", "b"}, + }, + }, + OnError: definition.OnErrorTaskDef{ + Script: []string{ + "echo ON_ERROR", + "exit 1", + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockOutputStore := test.NewMockOutputStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + // Use a real runner here to test the actual processing of a task.Task + taskRunner, _ := taskctl.NewTaskRunner(mockOutputStore) + return taskRunner + }, nil, mockOutputStore) + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("erroring_script", ScheduleOpts{}) + require.NoError(t, err) + + waitForCompletedJob(t, pRunner, job.ID) + assert.Error(t, job.LastError) + + jt := job.Tasks.ByName("on_error") + if assert.NotNil(t, jt) { + assert.False(t, jt.Canceled, "onError task was not marked as canceled") + assert.True(t, jt.Errored, "task was not marked as errored") + assert.Equal(t, "error", jt.Status, "task has status done") + assert.NotNil(t, jt.Error, "task has no error set") + } +} func TestPipelineRunner_CancelJob_WithRunningJob(t *testing.T) { var defs = &definition.PipelinesDef{ Pipelines: map[string]definition.PipelineDef{ From ee0f30551caa491187573252f186577eb6017f43 Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Fri, 22 Dec 2023 13:43:30 +0100 Subject: [PATCH 03/11] fix issue detected by linter -> improve error handling --- prunner.go | 62 ++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 11 deletions(-) diff --git a/prunner.go b/prunner.go index 966b5af..d1eb480 100644 --- a/prunner.go +++ b/prunner.go @@ -487,17 +487,57 @@ func (r *PipelineRunner) runOnErrorScript(t *task.Task, j *PipelineJob, onErrorT WithField("failedTaskName", t.Name). Debug("Triggering onError Script because of task failure") - rc, _ := r.outputStore.Reader(j.ID.String(), t.Name, "stdout") - defer func(rc io.ReadCloser) { - _ = rc.Close() - }(rc) - failedTaskStdout, err := io.ReadAll(rc) - - rc, _ = r.outputStore.Reader(j.ID.String(), t.Name, "stderr") - defer func(rc io.ReadCloser) { - _ = rc.Close() - }(rc) - failedTaskStderr, err := io.ReadAll(rc) + var failedTaskStdout []byte + rc, err := r.outputStore.Reader(j.ID.String(), t.Name, "stdout") + if err != nil { + log. + WithField("component", "runner"). + WithField("jobID", j.ID.String()). + WithField("pipeline", j.Pipeline). + WithField("failedTaskName", t.Name). + WithError(err). + Debug("Could not create stdoutReader for failed task") + } else { + defer func(rc io.ReadCloser) { + _ = rc.Close() + }(rc) + failedTaskStdout, err = io.ReadAll(rc) + if err != nil { + log. + WithField("component", "runner"). + WithField("jobID", j.ID.String()). + WithField("pipeline", j.Pipeline). + WithField("failedTaskName", t.Name). + WithError(err). + Debug("Could not read stdout of failed task") + } + } + + var failedTaskStderr []byte + rc, err = r.outputStore.Reader(j.ID.String(), t.Name, "stderr") + if err != nil { + log. + WithField("component", "runner"). + WithField("jobID", j.ID.String()). + WithField("pipeline", j.Pipeline). + WithField("failedTaskName", t.Name). + WithError(err). + Debug("Could not create stderrReader for failed task") + } else { + defer func(rc io.ReadCloser) { + _ = rc.Close() + }(rc) + failedTaskStderr, err = io.ReadAll(rc) + if err != nil { + log. + WithField("component", "runner"). + WithField("jobID", j.ID.String()). + WithField("pipeline", j.Pipeline). + WithField("failedTaskName", t.Name). + WithError(err). + Debug("Could not read stderr of failed task") + } + } onErrorVariables := make(map[string]interface{}) for key, value := range j.Variables { From f64ad9469834e8a4a580d504d76791112a8dbd86 Mon Sep 17 00:00:00 2001 From: Christopher Hlubek Date: Tue, 16 Jan 2024 19:14:35 +0100 Subject: [PATCH 04/11] Cosmetic changes while reviewing / added TODO --- definition/pipelines.go | 6 ++++-- prunner.go | 9 +++++---- taskctl/runner.go | 4 ++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/definition/pipelines.go b/definition/pipelines.go index 60c9ca8..d8bfa08 100644 --- a/definition/pipelines.go +++ b/definition/pipelines.go @@ -69,10 +69,12 @@ type PipelineDef struct { // Env sets/overrides environment variables for all tasks (takes precedence over process environment) Env map[string]string `yaml:"env"` + // Tasks is a map of task names to task definitions Tasks map[string]TaskDef `yaml:"tasks"` - // Script to be executed if this pipeline fails, e.g. for notifications. - // In this script, you have the following variables set: + // Task to be executed if this pipeline fails, e.g. for notifications. + // + // In this task, you have the following variables set: // - failedTaskName: Name of the failed task (key from pipelines.yml) // - failedTaskExitCode: Exit code of the failed task // - failedTaskError: Error message of the failed task diff --git a/prunner.go b/prunner.go index d1eb480..41a8404 100644 --- a/prunner.go +++ b/prunner.go @@ -421,7 +421,7 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) { } updateJobTaskStateFromTask(jt, t) - // if the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is set to FALSE), + // If the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is false), // then we directly abort all other tasks of the job. // NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only // if one task failed, and we want to kill the other tasks. @@ -496,7 +496,7 @@ func (r *PipelineRunner) runOnErrorScript(t *task.Task, j *PipelineJob, onErrorT WithField("pipeline", j.Pipeline). WithField("failedTaskName", t.Name). WithError(err). - Debug("Could not create stdoutReader for failed task") + Warn("Could not create stdout reader for failed task") } else { defer func(rc io.ReadCloser) { _ = rc.Close() @@ -509,7 +509,7 @@ func (r *PipelineRunner) runOnErrorScript(t *task.Task, j *PipelineJob, onErrorT WithField("pipeline", j.Pipeline). WithField("failedTaskName", t.Name). WithError(err). - Debug("Could not read stdout of failed task") + Warn("Could not read stdout of failed task") } } @@ -552,7 +552,7 @@ func (r *PipelineRunner) runOnErrorScript(t *task.Task, j *PipelineJob, onErrorT onErrorJobTask := jobTask{ TaskDef: definition.TaskDef{ Script: onErrorTaskDef.Script, - // AllowFailure needs to be FALSE; otherwise lastError below won't be filled (so errors will not appear in the log) + // AllowFailure needs to be false, otherwise lastError below won't be filled (so errors will not appear in the log) AllowFailure: false, Env: onErrorTaskDef.Env, }, @@ -962,6 +962,7 @@ func (r *PipelineRunner) Shutdown(ctx context.Context) error { // Wait for all running jobs to have called JobCompleted r.wg.Wait() + // TODO This is not safe to do outside of the requestPersist loop, since we might have a save in progress. So we need to wait until the save loop is finished before calling SaveToStore. // Do a final save to include the state of recently completed jobs r.SaveToStore() }() diff --git a/taskctl/runner.go b/taskctl/runner.go index 4bf0480..dec2fd4 100644 --- a/taskctl/runner.go +++ b/taskctl/runner.go @@ -166,8 +166,8 @@ func (r *TaskRunner) Run(t *task.Task) error { // but this lead to a huge memory leak because the full job output was retained // in memory forever. // This enabled features of taskctl like {{ .Tasks.TASKNAME.Output }} and {{.Output}}, - // but we never promised these features. Thus it is fine to not log to stdout and stderr - // into a Buffer, but directly to a file. + // but we never promised these features. Thus, it is fine to not log stdout and stderr + // into a Buffer, but directly to the output store. stdoutWriter []io.Writer stderrWriter []io.Writer ) From 107666277cedd20ba8228ca8f7f740b79389a0fc Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Tue, 20 May 2025 12:26:09 +0200 Subject: [PATCH 05/11] re-implement on-error handling --- definition/pipelines.go | 2 +- prunner.go | 304 +++++++++++++++++++++------------------- prunner_test.go | 65 ++++++++- taskctl/scheduler.go | 1 + 4 files changed, 225 insertions(+), 147 deletions(-) diff --git a/definition/pipelines.go b/definition/pipelines.go index d8bfa08..9155012 100644 --- a/definition/pipelines.go +++ b/definition/pipelines.go @@ -80,7 +80,7 @@ type PipelineDef struct { // - failedTaskError: Error message of the failed task // - failedTaskStdout: Stdout of the failed task // - failedTaskStderr: Stderr of the failed task - OnError OnErrorTaskDef `yaml:"onError"` + OnError *OnErrorTaskDef `yaml:"onError"` // SourcePath stores the source path where the pipeline was defined SourcePath string diff --git a/prunner.go b/prunner.go index 41a8404..145dd3a 100644 --- a/prunner.go +++ b/prunner.go @@ -42,7 +42,7 @@ type PipelineRunner struct { // externally, call requestPersist() persistRequests chan struct{} - // Mutex for reading or writing jobs and job state + // Mutex for reading or writing pipeline definitions (defs), jobs and job state mx sync.RWMutex createTaskRunner func(j *PipelineJob) taskctl.Runner @@ -104,7 +104,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat // It can be scheduled (in the waitListByPipeline of PipelineRunner), // or currently running (jobsByID / jobsByPipeline in PipelineRunner). type PipelineJob struct { - ID uuid.UUID + ID uuid.UUID + // Identifier of the pipeline (from the YAML file) Pipeline string Env map[string]string Variables map[string]interface{} @@ -195,6 +196,10 @@ var ErrJobNotFound = errors.New("job not found") var errJobAlreadyCompleted = errors.New("job is already completed") var ErrShuttingDown = errors.New("runner is shutting down") +// ScheduleAsync schedules a pipeline execution, if pipeline concurrency config allows for it. +// "pipeline" is the pipeline ID from the YAML file. +// +// the returned PipelineJob is the individual execution context. func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) { r.mx.Lock() defer r.mx.Unlock() @@ -399,11 +404,163 @@ func (r *PipelineRunner) startJob(job *PipelineJob) { go func() { defer r.wg.Done() lastErr := job.sched.Schedule(graph) + if lastErr != nil { + r.RunJobErrorHandler(job) + } r.JobCompleted(job.ID, lastErr) }() } +func (r *PipelineRunner) RunJobErrorHandler(job *PipelineJob) { + errorGraph, err := r.buildErrorGraph(job) + if err != nil { + log. + WithError(err). + WithField("jobID", job.ID). + WithField("pipeline", job.Pipeline). + Error("Failed to build error pipeline graph") + // At this point, an error with the error handling happened - duh... + // Nothing we can do at this point. + return + } + + // if errorGraph is nil (and no error); no error handling configured for task. + if errorGraph != nil { + // re-init scheduler, as we need a new one to schedule the error on. (the old one is already shut down + // if ContinueRunningTasksAfterFailure == false) + r.mx.Lock() + r.initScheduler(job) + r.mx.Unlock() + + err = job.sched.Schedule(errorGraph) + + if err != nil { + log. + WithError(err). + WithField("jobID", job.ID). + WithField("pipeline", job.Pipeline). + Error("Failed to run error handling for job") + } else { + log. + WithField("jobID", job.ID). + WithField("pipeline", job.Pipeline). + Info("error handling completed") + } + } +} + +func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.ExecutionGraph, error) { + r.mx.RLock() + defer r.mx.RUnlock() + pipelineDef, pipelineDefExists := r.defs.Pipelines[job.Pipeline] + if !pipelineDefExists { + return nil, fmt.Errorf("pipeline definition not found for pipeline %s (should never happen)", job.Pipeline) + } + onErrorTaskDef := pipelineDef.OnError + if onErrorTaskDef == nil { + // no error, but no error handling configured + return nil, nil + } + + // we assume the 1st failed task (by end date) is the root cause, because this triggered a cascading abort then. + failedTask := findFirstFailedTaskByEndDate(job.Tasks) + + failedTaskStdout := r.readTaskOutputBestEffort(job, failedTask, "stdout") + failedTaskStderr := r.readTaskOutputBestEffort(job, failedTask, "stderr") + + onErrorVariables := make(map[string]interface{}) + for key, value := range job.Variables { + onErrorVariables[key] = value + } + // TODO: find first failed task (by End Date) + + if failedTask != nil { + onErrorVariables["failedTaskName"] = failedTask.Name + onErrorVariables["failedTaskExitCode"] = failedTask.ExitCode + onErrorVariables["failedTaskError"] = failedTask.Error + onErrorVariables["failedTaskStdout"] = string(failedTaskStdout) + onErrorVariables["failedTaskStderr"] = string(failedTaskStderr) + } else { + onErrorVariables["failedTaskName"] = "task_not_identified_should_not_happen" + onErrorVariables["failedTaskExitCode"] = "99" + onErrorVariables["failedTaskError"] = "task_not_identified_should_not_happen" + onErrorVariables["failedTaskStdout"] = "task_not_identified_should_not_happen" + onErrorVariables["failedTaskStderr"] = "task_not_identified_should_not_happen" + } + + onErrorJobTask := jobTask{ + TaskDef: definition.TaskDef{ + Script: onErrorTaskDef.Script, + // AllowFailure needs to be false, otherwise lastError below won't be filled (so errors will not appear in the log) + AllowFailure: false, + Env: onErrorTaskDef.Env, + }, + Name: OnErrorTaskName, + Status: toStatus(scheduler.StatusWaiting), + } + job.Tasks = append(job.Tasks, onErrorJobTask) + + return buildPipelineGraph(job.ID, jobTasks{onErrorJobTask}, onErrorVariables) +} + +func (r *PipelineRunner) readTaskOutputBestEffort(job *PipelineJob, task *jobTask, outputName string) []byte { + if task == nil || job == nil { + return []byte(nil) + } + + rc, err := r.outputStore.Reader(job.ID.String(), task.Name, outputName) + if err != nil { + log. + WithField("component", "runner"). + WithField("jobID", job.ID.String()). + WithField("pipeline", job.Pipeline). + WithField("failedTaskName", task.Name). + WithField("outputName", outputName). + WithError(err). + Debug("Could not create stderrReader for failed task") + return []byte(nil) + } else { + defer func(rc io.ReadCloser) { + _ = rc.Close() + }(rc) + outputAsBytes, err := io.ReadAll(rc) + if err != nil { + log. + WithField("component", "runner"). + WithField("jobID", job.ID.String()). + WithField("pipeline", job.Pipeline). + WithField("failedTaskName", task.Name). + WithField("outputName", outputName). + WithError(err). + Debug("Could not read output of task") + } + + return outputAsBytes + } + +} -// HandleTaskChange will be called when the task state changes in the task runner +// FindFirstFailedTaskByEndDate returns the first failed task ordered by End Date +// A task is considered failed if it has errored or has a non-zero exit code +func findFirstFailedTaskByEndDate(tasks jobTasks) *jobTask { + var firstFailedTask *jobTask + + for i := range tasks { + task := &tasks[i] + + // Check if the task failed (has an error or non-zero exit code) + if task.Errored { + // If this is our first failed task or this one ended earlier than our current earliest + if firstFailedTask == nil || task.End.Before(*firstFailedTask.End) { + firstFailedTask = task + } + } + } + + return firstFailedTask +} + +// HandleTaskChange will be called when the task state changes in the task runner (taskctl) +// it is short-lived and updates our JobTask state accordingly. func (r *PipelineRunner) HandleTaskChange(t *task.Task) { r.mx.Lock() defer r.mx.Unlock() @@ -437,12 +594,6 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) { // Use internal cancel since we already have a lock on the mutex _ = r.cancelJobInternal(jobID) } - - if found && len(pipelineDef.OnError.Script) > 0 { - // we errored; and there is an onError script defined for the - // current pipeline. So let's run it. - r.runOnErrorScript(t, j, pipelineDef.OnError) - } } r.requestPersist() @@ -475,141 +626,6 @@ func updateJobTaskStateFromTask(jt *jobTask, t *task.Task) { const OnErrorTaskName = "on_error" -// runOnErrorScript is responsible for running a special "on_error" script in response to an error in the pipeline. -// It exposes variables containing information about the errored task. -// -// The method is triggered with the errored Task t, belonging to pipelineJob j; and pipelineDev -func (r *PipelineRunner) runOnErrorScript(t *task.Task, j *PipelineJob, onErrorTaskDef definition.OnErrorTaskDef) { - log. - WithField("component", "runner"). - WithField("jobID", j.ID.String()). - WithField("pipeline", j.Pipeline). - WithField("failedTaskName", t.Name). - Debug("Triggering onError Script because of task failure") - - var failedTaskStdout []byte - rc, err := r.outputStore.Reader(j.ID.String(), t.Name, "stdout") - if err != nil { - log. - WithField("component", "runner"). - WithField("jobID", j.ID.String()). - WithField("pipeline", j.Pipeline). - WithField("failedTaskName", t.Name). - WithError(err). - Warn("Could not create stdout reader for failed task") - } else { - defer func(rc io.ReadCloser) { - _ = rc.Close() - }(rc) - failedTaskStdout, err = io.ReadAll(rc) - if err != nil { - log. - WithField("component", "runner"). - WithField("jobID", j.ID.String()). - WithField("pipeline", j.Pipeline). - WithField("failedTaskName", t.Name). - WithError(err). - Warn("Could not read stdout of failed task") - } - } - - var failedTaskStderr []byte - rc, err = r.outputStore.Reader(j.ID.String(), t.Name, "stderr") - if err != nil { - log. - WithField("component", "runner"). - WithField("jobID", j.ID.String()). - WithField("pipeline", j.Pipeline). - WithField("failedTaskName", t.Name). - WithError(err). - Debug("Could not create stderrReader for failed task") - } else { - defer func(rc io.ReadCloser) { - _ = rc.Close() - }(rc) - failedTaskStderr, err = io.ReadAll(rc) - if err != nil { - log. - WithField("component", "runner"). - WithField("jobID", j.ID.String()). - WithField("pipeline", j.Pipeline). - WithField("failedTaskName", t.Name). - WithError(err). - Debug("Could not read stderr of failed task") - } - } - - onErrorVariables := make(map[string]interface{}) - for key, value := range j.Variables { - onErrorVariables[key] = value - } - onErrorVariables["failedTaskName"] = t.Name - onErrorVariables["failedTaskExitCode"] = t.ExitCode - onErrorVariables["failedTaskError"] = t.Error - onErrorVariables["failedTaskStdout"] = string(failedTaskStdout) - onErrorVariables["failedTaskStderr"] = string(failedTaskStderr) - - onErrorJobTask := jobTask{ - TaskDef: definition.TaskDef{ - Script: onErrorTaskDef.Script, - // AllowFailure needs to be false, otherwise lastError below won't be filled (so errors will not appear in the log) - AllowFailure: false, - Env: onErrorTaskDef.Env, - }, - Name: OnErrorTaskName, - Status: toStatus(scheduler.StatusWaiting), - } - - // store on task list; so that it appears in pipeline and UI etc - j.Tasks = append(j.Tasks, onErrorJobTask) - - onErrorGraph, err := buildPipelineGraph(j.ID, jobTasks{onErrorJobTask}, onErrorVariables) - if err != nil { - log. - WithError(err). - WithField("jobID", j.ID). - WithField("pipeline", j.Pipeline). - Error("Failed to build onError pipeline graph") - onErrorJobTask.Error = err - onErrorJobTask.Errored = true - - // the last element in the task list is our onErrorJobTask; but because it is not a pointer we need to - // store it again after modifying it. - j.Tasks[len(j.Tasks)-1] = onErrorJobTask - return - } - - // we use a detached taskRunner and scheduler to run the onError task, to - // run synchronously (as we are already in an async goroutine here), won't have any cycles, - // and to simplify the code. - taskRunner := r.createTaskRunner(j) - sched := taskctl.NewScheduler(taskRunner) - - // Now, actually run the job synchronously - lastErr := sched.Schedule(onErrorGraph) - - // Update job status as with normal jobs - onErrorJobTask.Status = toStatus(onErrorGraph.Nodes()[OnErrorTaskName].ReadStatus()) - updateJobTaskStateFromTask(&onErrorJobTask, onErrorGraph.Nodes()[OnErrorTaskName].Task) - - if lastErr != nil { - log. - WithError(err). - WithField("jobID", j.ID). - WithField("pipeline", j.Pipeline). - Error("Error running the onError handler") - } else { - log. - WithField("jobID", j.ID). - WithField("pipeline", j.Pipeline). - Debug("Successfully ran the onError handler") - } - - // the last element in the task list is our onErrorJobTask; but because it is not a pointer we need to - // store it again after modifying it. - j.Tasks[len(j.Tasks)-1] = onErrorJobTask -} - // HandleStageChange will be called when the stage state changes in the scheduler func (r *PipelineRunner) HandleStageChange(stage *scheduler.Stage) { r.mx.Lock() diff --git a/prunner_test.go b/prunner_test.go index 1a33ed7..fdb2dab 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -245,7 +245,7 @@ func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook(t *t DependsOn: []string{"a", "b"}, }, }, - OnError: definition.OnErrorTaskDef{ + OnError: &definition.OnErrorTaskDef{ Script: []string{ "echo ON_ERROR", "echo 'Failed Task Name: {{ .failedTaskName }}'", @@ -318,7 +318,7 @@ func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook_AndS DependsOn: []string{"a", "b"}, }, }, - OnError: definition.OnErrorTaskDef{ + OnError: &definition.OnErrorTaskDef{ Script: []string{ "echo ON_ERROR", "exit 1", @@ -354,6 +354,67 @@ func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook_AndS assert.NotNil(t, jt.Error, "task has no error set") } } + +func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook_AndSetsStateCorrectlyIfErrorHookUsesUnknownVariables(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "erroring_script": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "a": { + Script: []string{"echo A"}, + }, + "b": { + Script: []string{ + "echo stdoutContent", + "echo This message goes to stderr >&2", + "exit 42", + }, + }, + "wait": { + DependsOn: []string{"a", "b"}, + }, + }, + OnError: &definition.OnErrorTaskDef{ + Script: []string{ + "echo ON_ERROR", + "echo {{ .does_not_exist_and_error }}", + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockOutputStore := test.NewMockOutputStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + // Use a real runner here to test the actual processing of a task.Task + taskRunner, _ := taskctl.NewTaskRunner(mockOutputStore) + return taskRunner + }, nil, mockOutputStore) + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("erroring_script", ScheduleOpts{}) + require.NoError(t, err) + + waitForCompletedJob(t, pRunner, job.ID) + assert.Error(t, job.LastError) + + jt := job.Tasks.ByName("on_error") + if assert.NotNil(t, jt) { + assert.False(t, jt.Canceled, "onError task was not marked as canceled") + assert.True(t, jt.Errored, "task was not marked as errored") + assert.Equal(t, "error", jt.Status, "task has status done") + assert.NotNil(t, jt.Error, "task has no error set") + assert.Equal(t, "template: interpolate:1:8: executing \"interpolate\" at <.does_not_exist_and_error>: map has no entry for key \"does_not_exist_and_error\"", jt.Error.Error(), "task has wrong error message") + } +} + func TestPipelineRunner_CancelJob_WithRunningJob(t *testing.T) { var defs = &definition.PipelinesDef{ Pipelines: map[string]definition.PipelineDef{ diff --git a/taskctl/scheduler.go b/taskctl/scheduler.go index 7ae2271..d2c6a69 100644 --- a/taskctl/scheduler.go +++ b/taskctl/scheduler.go @@ -39,6 +39,7 @@ func (s *Scheduler) OnStageChange(f func(stage *scheduler.Stage)) { // Schedule starts execution of the given ExecutionGraph // // Modified to notify on stage changes +// Method blocks until FULL EXECUTION is completed func (s *Scheduler) Schedule(g *scheduler.ExecutionGraph) error { var wg sync.WaitGroup From 7df47984759f23c4b632453d4c90640c5b7e55a3 Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Tue, 20 May 2025 12:28:28 +0200 Subject: [PATCH 06/11] code cleanup --- prunner.go | 48 ++++++++++++++++++++---------------------------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/prunner.go b/prunner.go index 145dd3a..58bdc42 100644 --- a/prunner.go +++ b/prunner.go @@ -448,6 +448,8 @@ func (r *PipelineRunner) RunJobErrorHandler(job *PipelineJob) { } } +const OnErrorTaskName = "on_error" + func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.ExecutionGraph, error) { r.mx.RLock() defer r.mx.RUnlock() @@ -576,7 +578,24 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) { if jt == nil { return } - updateJobTaskStateFromTask(jt, t) + if !t.Start.IsZero() { + start := t.Start + jt.Start = &start + } + if !t.End.IsZero() { + end := t.End + jt.End = &end + } + jt.ExitCode = t.ExitCode + jt.Skipped = t.Skipped + + // Set canceled flag on the job if a task was canceled through the context + if errors.Is(t.Error, context.Canceled) { + jt.Canceled = true + } else { + jt.Errored = t.Errored + jt.Error = t.Error + } // If the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is false), // then we directly abort all other tasks of the job. @@ -599,33 +618,6 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) { r.requestPersist() } -// updateJobTaskStateFromTask updates jobTask properties from a given taskCtl task.Task. -// Very internal helper function, to be used in PipelineRunner.HandleTaskChange -// and PipelineRunner.runOnErrorScript. -func updateJobTaskStateFromTask(jt *jobTask, t *task.Task) { - if !t.Start.IsZero() { - start := t.Start - jt.Start = &start - } - if !t.End.IsZero() { - end := t.End - jt.End = &end - } - jt.ExitCode = t.ExitCode - jt.Skipped = t.Skipped - - // Set canceled flag on the job if a task was canceled through the context - if errors.Is(t.Error, context.Canceled) { - jt.Canceled = true - } else { - jt.Errored = t.Errored - jt.Error = t.Error - } - -} - -const OnErrorTaskName = "on_error" - // HandleStageChange will be called when the stage state changes in the scheduler func (r *PipelineRunner) HandleStageChange(stage *scheduler.Stage) { r.mx.Lock() From cd803ffb9f6b49fbbf42fa9a7d9d77929ccda86d Mon Sep 17 00:00:00 2001 From: Sebastian Kurfuerst Date: Tue, 27 May 2025 10:47:52 +0200 Subject: [PATCH 07/11] BUGFIX: fix NIL error on error handling --- prunner.go | 11 ++++++++++- prunner_test.go | 7 ++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/prunner.go b/prunner.go index 58bdc42..bcf6307 100644 --- a/prunner.go +++ b/prunner.go @@ -552,7 +552,11 @@ func findFirstFailedTaskByEndDate(tasks jobTasks) *jobTask { // Check if the task failed (has an error or non-zero exit code) if task.Errored { // If this is our first failed task or this one ended earlier than our current earliest - if firstFailedTask == nil || task.End.Before(*firstFailedTask.End) { + if firstFailedTask == nil { + // we did not see any failed task yet. remember this one as the 1st failed task. + firstFailedTask = task + } else if firstFailedTask.End != nil && task.End != nil && task.End.Before(*firstFailedTask.End) { + // this task has failed EARLIER than the one we already remembered. firstFailedTask = task } } @@ -602,6 +606,11 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) { // NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only // if one task failed, and we want to kill the other tasks. if jt.Errored { + if jt.End == nil { + // Remember ending time in case of error (we need this to identify the correct onError hook) + now := time.Now() + jt.End = &now + } pipelineDef, found := r.defs.Pipelines[j.Pipeline] if found && !pipelineDef.ContinueRunningTasksAfterFailure { log. diff --git a/prunner_test.go b/prunner_test.go index fdb2dab..394195f 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -234,15 +234,20 @@ func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook(t *t "a": { Script: []string{"echo A"}, }, + "c": { + Script: []string{"sleep 10; exit 42"}, + DependsOn: []string{"b"}, + }, "b": { Script: []string{ "echo stdoutContent", "echo This message goes to stderr >&2", "exit 42", }, + DependsOn: []string{"a"}, }, "wait": { - DependsOn: []string{"a", "b"}, + DependsOn: []string{"a", "b", "c"}, }, }, OnError: &definition.OnErrorTaskDef{ From 8d44d0475378b6bc1044cfcc4f6aec117b428c95 Mon Sep 17 00:00:00 2001 From: Christopher Hlubek Date: Tue, 10 Jun 2025 17:52:17 +0200 Subject: [PATCH 08/11] fix: implement save synchronization on shutdown, refactor first failed task handling --- prunner.go | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/prunner.go b/prunner.go index bcf6307..8e05053 100644 --- a/prunner.go +++ b/prunner.go @@ -41,6 +41,7 @@ type PipelineRunner struct { // persistRequests is for triggering saving-the-store, which is then handled asynchronously, at most every 3 seconds (see NewPipelineRunner) // externally, call requestPersist() persistRequests chan struct{} + persistLoopDone chan struct{} // Mutex for reading or writing pipeline definitions (defs), jobs and job state mx sync.RWMutex @@ -50,6 +51,8 @@ type PipelineRunner struct { wg sync.WaitGroup // Flag if the runner is shutting down isShuttingDown bool + // shutdownCancel is the cancel function for the shutdown context (will stop persist loop) + shutdownCancel context.CancelFunc // Poll interval for completed jobs for graceful shutdown ShutdownPollInterval time.Duration @@ -57,6 +60,8 @@ type PipelineRunner struct { // NewPipelineRunner creates the central data structure which controls the full runner state; so this knows what is currently running func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, createTaskRunner func(j *PipelineJob) taskctl.Runner, store store.DataStore, outputStore taskctl.OutputStore) (*PipelineRunner, error) { + ctx, cancel := context.WithCancel(ctx) + pRunner := &PipelineRunner{ defs: defs, // jobsByID contains ALL jobs, no matter whether they are on the waitlist or are scheduled or cancelled. @@ -69,6 +74,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat outputStore: outputStore, // Use channel buffered with one extra slot, so we can keep save requests while a save is running without blocking persistRequests: make(chan struct{}, 1), + persistLoopDone: make(chan struct{}), + shutdownCancel: cancel, createTaskRunner: createTaskRunner, ShutdownPollInterval: 3 * time.Second, } @@ -80,6 +87,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat } go func() { + defer close(pRunner.persistLoopDone) // Signal that the persist loop is done on shutdown + for { select { case <-ctx.Done(): @@ -123,6 +132,8 @@ type PipelineJob struct { // Tasks is an in-memory representation with state of tasks, sorted by dependencies Tasks jobTasks LastError error + // firstFailedTask is a reference to the first task that failed in this job + firstFailedTask *jobTask sched *taskctl.Scheduler taskRunner runner.Runner @@ -411,7 +422,7 @@ func (r *PipelineRunner) startJob(job *PipelineJob) { }() } func (r *PipelineRunner) RunJobErrorHandler(job *PipelineJob) { - errorGraph, err := r.buildErrorGraph(job) + errorGraph, err := r.BuildErrorGraph(job) if err != nil { log. WithError(err). @@ -450,9 +461,10 @@ func (r *PipelineRunner) RunJobErrorHandler(job *PipelineJob) { const OnErrorTaskName = "on_error" -func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.ExecutionGraph, error) { +func (r *PipelineRunner) BuildErrorGraph(job *PipelineJob) (*scheduler.ExecutionGraph, error) { r.mx.RLock() defer r.mx.RUnlock() + pipelineDef, pipelineDefExists := r.defs.Pipelines[job.Pipeline] if !pipelineDefExists { return nil, fmt.Errorf("pipeline definition not found for pipeline %s (should never happen)", job.Pipeline) @@ -463,8 +475,7 @@ func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.Execution return nil, nil } - // we assume the 1st failed task (by end date) is the root cause, because this triggered a cascading abort then. - failedTask := findFirstFailedTaskByEndDate(job.Tasks) + failedTask := job.firstFailedTask failedTaskStdout := r.readTaskOutputBestEffort(job, failedTask, "stdout") failedTaskStderr := r.readTaskOutputBestEffort(job, failedTask, "stderr") @@ -473,7 +484,6 @@ func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.Execution for key, value := range job.Variables { onErrorVariables[key] = value } - // TODO: find first failed task (by End Date) if failedTask != nil { onErrorVariables["failedTaskName"] = failedTask.Name @@ -606,10 +616,9 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) { // NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only // if one task failed, and we want to kill the other tasks. if jt.Errored { - if jt.End == nil { - // Remember ending time in case of error (we need this to identify the correct onError hook) - now := time.Now() - jt.End = &now + if j.firstFailedTask == nil { + // Remember the first failed task for later use in the error handling + j.firstFailedTask = jt } pipelineDef, found := r.defs.Pipelines[j.Pipeline] if found && !pipelineDef.ContinueRunningTasksAfterFailure { @@ -979,13 +988,16 @@ func (r *PipelineRunner) Shutdown(ctx context.Context) error { // Wait for all running jobs to have called JobCompleted r.wg.Wait() - // TODO This is not safe to do outside of the requestPersist loop, since we might have a save in progress. So we need to wait until the save loop is finished before calling SaveToStore. + // Wait until the persist loop is done + <-r.persistLoopDone // Do a final save to include the state of recently completed jobs r.SaveToStore() }() r.mx.Lock() r.isShuttingDown = true + r.shutdownCancel() + // Cancel all jobs on wait list for pipelineName, jobs := range r.waitListByPipeline { for _, job := range jobs { From cfe90180362aa2ef0d3cf050a21a13ea7c50f33f Mon Sep 17 00:00:00 2001 From: Christopher Hlubek Date: Tue, 10 Jun 2025 18:16:10 +0200 Subject: [PATCH 09/11] fix: fixed a data race with WaitGroup, removed unused code --- prunner.go | 95 +++++++++++++++++++----------------------------------- 1 file changed, 33 insertions(+), 62 deletions(-) diff --git a/prunner.go b/prunner.go index 8e05053..3a311b1 100644 --- a/prunner.go +++ b/prunner.go @@ -412,17 +412,19 @@ func (r *PipelineRunner) startJob(job *PipelineJob) { // Run graph asynchronously r.wg.Add(1) - go func() { + go func(sched *taskctl.Scheduler) { defer r.wg.Done() - lastErr := job.sched.Schedule(graph) + lastErr := sched.Schedule(graph) if lastErr != nil { r.RunJobErrorHandler(job) } - r.JobCompleted(job.ID, lastErr) - }() + r.JobCompleted(job, lastErr) + }(job.sched) } func (r *PipelineRunner) RunJobErrorHandler(job *PipelineJob) { - errorGraph, err := r.BuildErrorGraph(job) + r.mx.Lock() + errorGraph, err := r.buildErrorGraph(job) + r.mx.Unlock() if err != nil { log. WithError(err). @@ -435,36 +437,35 @@ func (r *PipelineRunner) RunJobErrorHandler(job *PipelineJob) { } // if errorGraph is nil (and no error); no error handling configured for task. - if errorGraph != nil { - // re-init scheduler, as we need a new one to schedule the error on. (the old one is already shut down - // if ContinueRunningTasksAfterFailure == false) - r.mx.Lock() - r.initScheduler(job) - r.mx.Unlock() + if errorGraph == nil { + return + } - err = job.sched.Schedule(errorGraph) + // re-init scheduler, as we need a new one to schedule the error on. (the old one is already shut down + // if ContinueRunningTasksAfterFailure == false) + r.mx.Lock() + r.initScheduler(job) + r.mx.Unlock() - if err != nil { - log. - WithError(err). - WithField("jobID", job.ID). - WithField("pipeline", job.Pipeline). - Error("Failed to run error handling for job") - } else { - log. - WithField("jobID", job.ID). - WithField("pipeline", job.Pipeline). - Info("error handling completed") - } + err = job.sched.Schedule(errorGraph) + + if err != nil { + log. + WithError(err). + WithField("jobID", job.ID). + WithField("pipeline", job.Pipeline). + Error("Failed to run error handling for job") + } else { + log. + WithField("jobID", job.ID). + WithField("pipeline", job.Pipeline). + Info("error handling completed") } } const OnErrorTaskName = "on_error" -func (r *PipelineRunner) BuildErrorGraph(job *PipelineJob) (*scheduler.ExecutionGraph, error) { - r.mx.RLock() - defer r.mx.RUnlock() - +func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.ExecutionGraph, error) { pipelineDef, pipelineDefExists := r.defs.Pipelines[job.Pipeline] if !pipelineDefExists { return nil, fmt.Errorf("pipeline definition not found for pipeline %s (should never happen)", job.Pipeline) @@ -551,30 +552,6 @@ func (r *PipelineRunner) readTaskOutputBestEffort(job *PipelineJob, task *jobTas } -// FindFirstFailedTaskByEndDate returns the first failed task ordered by End Date -// A task is considered failed if it has errored or has a non-zero exit code -func findFirstFailedTaskByEndDate(tasks jobTasks) *jobTask { - var firstFailedTask *jobTask - - for i := range tasks { - task := &tasks[i] - - // Check if the task failed (has an error or non-zero exit code) - if task.Errored { - // If this is our first failed task or this one ended earlier than our current earliest - if firstFailedTask == nil { - // we did not see any failed task yet. remember this one as the 1st failed task. - firstFailedTask = task - } else if firstFailedTask.End != nil && task.End != nil && task.End.Before(*firstFailedTask.End) { - // this task has failed EARLIER than the one we already remembered. - firstFailedTask = task - } - } - } - - return firstFailedTask -} - // HandleTaskChange will be called when the task state changes in the task runner (taskctl) // it is short-lived and updates our JobTask state accordingly. func (r *PipelineRunner) HandleTaskChange(t *task.Task) { @@ -662,15 +639,10 @@ func (r *PipelineRunner) HandleStageChange(stage *scheduler.Stage) { r.requestPersist() } -func (r *PipelineRunner) JobCompleted(id uuid.UUID, err error) { +func (r *PipelineRunner) JobCompleted(job *PipelineJob, err error) { r.mx.Lock() defer r.mx.Unlock() - job := r.jobsByID[id] - if job == nil { - return - } - job.deinitScheduler() job.Completed = true @@ -686,7 +658,7 @@ func (r *PipelineRunner) JobCompleted(id uuid.UUID, err error) { pipeline := job.Pipeline log. WithField("component", "runner"). - WithField("jobID", id). + WithField("jobID", job.ID). WithField("pipeline", pipeline). Debug("Job completed") @@ -888,9 +860,6 @@ func (r *PipelineRunner) initialLoadFromStore() error { } func (r *PipelineRunner) SaveToStore() { - r.wg.Add(1) - defer r.wg.Done() - log. WithField("component", "runner"). Debugf("Saving job state to data store") @@ -985,11 +954,13 @@ func (r *PipelineRunner) Shutdown(ctx context.Context) error { log. WithField("component", "runner"). Debugf("Shutting down, waiting for pending operations...") + // Wait for all running jobs to have called JobCompleted r.wg.Wait() // Wait until the persist loop is done <-r.persistLoopDone + // Do a final save to include the state of recently completed jobs r.SaveToStore() }() From a66beb1ca5662d019a77b102d67afb736e27ffa2 Mon Sep 17 00:00:00 2001 From: Christopher Hlubek Date: Tue, 10 Jun 2025 19:17:50 +0200 Subject: [PATCH 10/11] refactor: rename onError to on_error in YAML, added example --- definition/pipelines.go | 4 ++-- examples/pipelines.yml | 3 +++ prunner_test.go | 6 +++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/definition/pipelines.go b/definition/pipelines.go index 19b646d..e5d3114 100644 --- a/definition/pipelines.go +++ b/definition/pipelines.go @@ -72,7 +72,7 @@ type PipelineDef struct { // Tasks is a map of task names to task definitions Tasks map[string]TaskDef `yaml:"tasks"` - // Task to be executed if this pipeline fails, e.g. for notifications. + // Task to be added and executed if this pipeline fails, e.g. for notifications. // // In this task, you have the following variables set: // - failedTaskName: Name of the failed task (key from pipelines.yml) @@ -80,7 +80,7 @@ type PipelineDef struct { // - failedTaskError: Error message of the failed task // - failedTaskStdout: Stdout of the failed task // - failedTaskStderr: Stderr of the failed task - OnError *OnErrorTaskDef `yaml:"onError"` + OnError *OnErrorTaskDef `yaml:"on_error"` // SourcePath stores the source path where the pipeline was defined SourcePath string diff --git a/examples/pipelines.yml b/examples/pipelines.yml index cb70203..00b2b54 100644 --- a/examples/pipelines.yml +++ b/examples/pipelines.yml @@ -60,6 +60,9 @@ pipelines: no-work: script: - go for a walk + on_error: + script: + - echo "Something went wrong, let's handle the error from {{.failedTaskName}}" queue_it: concurrency: 2 diff --git a/prunner_test.go b/prunner_test.go index 394195f..83d2c89 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -294,7 +294,7 @@ Failed Task Stderr: This message goes to stderr jt := job.Tasks.ByName("on_error") if assert.NotNil(t, jt) { - assert.False(t, jt.Canceled, "onError task was not marked as canceled") + assert.False(t, jt.Canceled, "on_error task was not marked as canceled") assert.False(t, jt.Errored, "task was not marked as errored") assert.Equal(t, "done", jt.Status, "task has status done") assert.Nil(t, jt.Error, "task has no error set") @@ -353,7 +353,7 @@ func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook_AndS jt := job.Tasks.ByName("on_error") if assert.NotNil(t, jt) { - assert.False(t, jt.Canceled, "onError task was not marked as canceled") + assert.False(t, jt.Canceled, "on_error task was not marked as canceled") assert.True(t, jt.Errored, "task was not marked as errored") assert.Equal(t, "error", jt.Status, "task has status done") assert.NotNil(t, jt.Error, "task has no error set") @@ -412,7 +412,7 @@ func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook_AndS jt := job.Tasks.ByName("on_error") if assert.NotNil(t, jt) { - assert.False(t, jt.Canceled, "onError task was not marked as canceled") + assert.False(t, jt.Canceled, "on_error task was not marked as canceled") assert.True(t, jt.Errored, "task was not marked as errored") assert.Equal(t, "error", jt.Status, "task has status done") assert.NotNil(t, jt.Error, "task has no error set") From 3265df318b7c78a4f83c85e6d695ae100993357c Mon Sep 17 00:00:00 2001 From: Christopher Hlubek Date: Tue, 10 Jun 2025 19:31:48 +0200 Subject: [PATCH 11/11] doc: added readme, removed unused variables --- README.md | 37 +++++++++++++++++++++++++++++++++++++ prunner.go | 6 ------ 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 131c8be..46ded0f 100644 --- a/README.md +++ b/README.md @@ -281,6 +281,43 @@ pipelines: tasks: # as usual ``` +### Error handling with on_error + +When a pipeline fails due to a task error, you can optionally configure an `on_error` task that will be executed +to handle the failure. This is useful for sending notifications, cleanup operations, or logging failure details. + +```yaml +pipelines: + deployment: + tasks: + build: + script: + - npm run build + deploy: + script: + - ./deploy.sh + depends_on: + - build + on_error: + script: + - echo "Deployment failed! Notifying team..." + - curl -d "Deployment of {{.failedTaskName}} failed" ntfy.sh/mytopic +``` + +The on_error task has access to special variables containing information about the failed task: + +* `failedTaskName`: Name of the task that failed (key from `pipelines`) +* `failedTaskExitCode`: Exit code of the failed task +* `failedTaskError`: Error message of the failed task +* `failedTaskStdout`: Standard output of the failed task +* `failedTaskStderr`: Standard error output of the failed task + +#### Important notes: + +* The `on_error` task only runs when a "normal" task in the pipeline fails +* If the `on_error` task itself fails, the error will be logged but won't trigger another error handler +* The `on_error` task has access to all the same job variables as regular tasks +* Environment variables can be configured for the `on_error` task just like regular tasks ### Configuring retention period diff --git a/prunner.go b/prunner.go index 3a311b1..54531ab 100644 --- a/prunner.go +++ b/prunner.go @@ -492,12 +492,6 @@ func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.Execution onErrorVariables["failedTaskError"] = failedTask.Error onErrorVariables["failedTaskStdout"] = string(failedTaskStdout) onErrorVariables["failedTaskStderr"] = string(failedTaskStderr) - } else { - onErrorVariables["failedTaskName"] = "task_not_identified_should_not_happen" - onErrorVariables["failedTaskExitCode"] = "99" - onErrorVariables["failedTaskError"] = "task_not_identified_should_not_happen" - onErrorVariables["failedTaskStdout"] = "task_not_identified_should_not_happen" - onErrorVariables["failedTaskStderr"] = "task_not_identified_should_not_happen" } onErrorJobTask := jobTask{