3636import com .netflix .conductor .core .execution .mapper .TaskMapper ;
3737import com .netflix .conductor .core .execution .mapper .TaskMapperContext ;
3838import com .netflix .conductor .core .execution .tasks .SystemTaskRegistry ;
39+ import com .netflix .conductor .core .listener .TaskStatusListener ;
3940import com .netflix .conductor .core .utils .ExternalPayloadStorageUtils ;
4041import com .netflix .conductor .core .utils .IDGenerator ;
4142import com .netflix .conductor .core .utils .ParametersUtils ;
@@ -65,7 +66,7 @@ public class DeciderService {
6566 private final MetadataDAO metadataDAO ;
6667 private final SystemTaskRegistry systemTaskRegistry ;
6768 private final long taskPendingTimeThresholdMins ;
68-
69+ private final TaskStatusListener taskStatusListener ;
6970 private final Map <String , TaskMapper > taskMappers ;
7071
7172 public DeciderService (
@@ -76,14 +77,16 @@ public DeciderService(
7677 SystemTaskRegistry systemTaskRegistry ,
7778 @ Qualifier ("taskMappersByTaskType" ) Map <String , TaskMapper > taskMappers ,
7879 @ Value ("${conductor.app.taskPendingTimeThreshold:60m}" )
79- Duration taskPendingTimeThreshold ) {
80+ Duration taskPendingTimeThreshold ,
81+ TaskStatusListener taskStatusListener ) {
8082 this .idGenerator = idGenerator ;
8183 this .metadataDAO = metadataDAO ;
8284 this .parametersUtils = parametersUtils ;
8385 this .taskMappers = taskMappers ;
8486 this .externalPayloadStorageUtils = externalPayloadStorageUtils ;
8587 this .taskPendingTimeThresholdMins = taskPendingTimeThreshold .toMinutes ();
8688 this .systemTaskRegistry = systemTaskRegistry ;
89+ this .taskStatusListener = taskStatusListener ;
8790 }
8891
8992 public DeciderOutcome decide (WorkflowModel workflow ) throws TerminateWorkflowException {
@@ -773,10 +776,12 @@ void timeoutTaskWithTimeoutPolicy(String reason, TaskDef taskDef, TaskModel task
773776 case RETRY :
774777 task .setStatus (TIMED_OUT );
775778 task .setReasonForIncompletion (reason );
779+ taskStatusListener .onTaskTimedOut (task );
776780 return ;
777781 case TIME_OUT_WF :
778782 task .setStatus (TIMED_OUT );
779783 task .setReasonForIncompletion (reason );
784+ taskStatusListener .onTaskTimedOut (task );
780785 throw new TerminateWorkflowException (reason , WorkflowModel .Status .TIMED_OUT , task );
781786 }
782787 }
@@ -851,6 +856,7 @@ private void timeoutTask(TaskDef taskDef, TaskModel task) {
851856 LOGGER .debug (reason );
852857 task .setStatus (TIMED_OUT );
853858 task .setReasonForIncompletion (reason );
859+ taskStatusListener .onTaskTimedOut (task );
854860 }
855861
856862 public List <TaskModel > getTasksToBeScheduled (
0 commit comments