@@ -40,6 +40,7 @@ import (
4040 "github.com/AliceO2Group/Control/common/logger/infologger"
4141 "github.com/AliceO2Group/Control/common/runtype"
4242 "github.com/AliceO2Group/Control/common/system"
43+ "github.com/AliceO2Group/Control/common/utils"
4344 "github.com/AliceO2Group/Control/common/utils/uid"
4445 "github.com/AliceO2Group/Control/core/task"
4546 "github.com/AliceO2Group/Control/core/the"
@@ -205,34 +206,50 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
205206}
206207
207208func (env * Environment ) handleHooks (workflow workflow.Role , trigger string ) (err error ) {
208- // First we start any tasks
209+ log .WithField ("partition" , env .id ).Debugf ("begin handling hooks for trigger %s" , trigger )
210+ defer utils .TimeTrack (time .Now (), fmt .Sprintf ("finished handling hooks for trigger %s" , trigger ), log .WithPrefix ("env" ).WithField ("partition" , env .id ))
211+
212+ // Starting point: get all hooks to be started for the current trigger
209213 allHooks := workflow .GetHooksForTrigger (trigger )
214+
215+ // Hooks can be call hooks or task hooks, we do the calls first
210216 callsToStart := allHooks .FilterCalls ()
211217 if len (callsToStart ) != 0 {
212218 // Before we run anything asynchronously we must associate each call we're about
213219 // to start with its corresponding await expression
214220 for _ , call := range callsToStart {
215221 awaitExpr := call .GetTraits ().Await
222+ // If the callsPendingAwait map has no pending calls list for the given await expression,
223+ // we make sure it's created before we add any pending awaits.
216224 if _ , ok := env .callsPendingAwait [awaitExpr ]; ! ok || len (env .callsPendingAwait [awaitExpr ]) == 0 {
217225 env .callsPendingAwait [awaitExpr ] = make (callable.Calls , 0 )
218226 }
219227 env .callsPendingAwait [awaitExpr ] = append (env .callsPendingAwait [awaitExpr ], call )
220228 }
221- callsToStart .StartAll ()
229+ callsToStart .StartAll () // returns immediately (async)
222230 }
223231
224- // Then we take care of any pending hooks, including from the current trigger
232+ // Then we take care of any pending hooks whose await expression corresponds to the current trigger,
233+ // including any tasks that have just been started (for which trigger == call.Trigger == call.Await).
225234 // TODO: this should be further refined by adding priority/weight
226- pendingCalls , ok := env .callsPendingAwait [trigger ]
227235 callErrors := make (map [* callable.Call ]error )
236+ pendingCalls , ok := env .callsPendingAwait [trigger ]
228237 if ok && len (pendingCalls ) != 0 { // there are hooks to take care of
238+ // AwaitAll blocks with no global timeout - it is up to the specific called function to implement
239+ // a timeout internally.
240+ // The Call instance pushes to the call's varStack some special values including the timeout
241+ // (provided by the workflow template). At that point the integration plugin must acquire the
242+ // timeout value and use the Context mechanism or some other approach to ensure the timeouts are
243+ // respected.
244+
229245 callErrors = pendingCalls .AwaitAll ()
230246 }
231247
232248 // Tasks are handled separately for now, and they cannot have trigger!=await
233249 hooksToTrigger := allHooks .FilterTasks ()
234- taskErrors := env .runTasksAsHooks (hooksToTrigger )
250+ taskErrors := env .runTasksAsHooks (hooksToTrigger ) // blocking call, timeouts in executor
235251
252+ // Prepare structures to accumulate errors
236253 allErrors := make (map [callable.Hook ]error )
237254 criticalFailures := make ([]error , 0 )
238255
0 commit comments