Skip to content

Commit dec0928

Browse files
miltalexteo
authored andcommitted
[core] fix subscribe to workflow state change
1 parent b156a08 commit dec0928

2 files changed

Lines changed: 37 additions & 35 deletions

File tree

core/environment/manager.go

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -115,41 +115,6 @@ func (envs *Manager) CreateEnvironment(workflowPath string, userVars map[string]
115115
return env.id, err
116116
}
117117

118-
// This will subscribe to workflow state change. In case of workflow state
119-
// ERROR will transition environment to ERROR state. The goroutine starts
120-
// after a successful transition to CONFIGURE in order to handle only ERROR
121-
// states triggered by mesos.(TASK_LOST,TASK_KILLED,TASK_FAILED,TASK_ERROR)
122-
go func() {
123-
wf := env.Workflow()
124-
notify := make(chan task.State)
125-
subscriptionId := uuid.NewUUID().String()
126-
env.wfAdapter.SubscribeToStateChange(subscriptionId, notify)
127-
defer env.wfAdapter.UnsubscribeFromStateChange(subscriptionId)
128-
129-
wfState := wf.GetState()
130-
if wfState != task.ERROR {
131-
WORKFLOW_STATE_LOOP:
132-
for {
133-
select {
134-
case wfState = <-notify:
135-
if wfState == task.DONE {
136-
break WORKFLOW_STATE_LOOP
137-
}
138-
if wfState == task.ERROR {
139-
env.setState(wfState.String())
140-
break WORKFLOW_STATE_LOOP
141-
}
142-
continue
143-
default:
144-
if _, ok := envs.m[env.id.Array()]; !ok {
145-
break WORKFLOW_STATE_LOOP
146-
}
147-
continue
148-
}
149-
}
150-
}
151-
}()
152-
153118
return env.id, err
154119
}
155120

core/environment/transition_configure.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,44 @@ func (t ConfigureTransition) do(env *Environment) (err error) {
178178

179179
if len(tasks) != 0 {
180180
err = t.taskman.ConfigureTasks(env.Id().Array(), tasks)
181+
if err != nil {
182+
return
183+
}
181184
}
182185

186+
// This will subscribe to workflow state change. In case of workflow state
187+
// ERROR will transition environment to ERROR state. The goroutine starts
188+
// after a successful transition to CONFIGURE in order to handle only ERROR
189+
// states triggered by mesos.(TASK_LOST,TASK_KILLED,TASK_FAILED,TASK_ERROR)
190+
go func() {
191+
wf := env.Workflow()
192+
notify := make(chan task.State)
193+
subscriptionId := uuid.NewUUID().String()
194+
env.wfAdapter.SubscribeToStateChange(subscriptionId, notify)
195+
defer env.wfAdapter.UnsubscribeFromStateChange(subscriptionId)
196+
197+
wfState := wf.GetState()
198+
if wfState != task.ERROR {
199+
WORKFLOW_STATE_LOOP:
200+
for {
201+
select {
202+
case wfState = <-notify:
203+
if wfState == task.DONE {
204+
break WORKFLOW_STATE_LOOP
205+
}
206+
// We kill the goroutine on a reset or a teardown
207+
// of the environment
208+
if wfState == task.STANDBY {
209+
break WORKFLOW_STATE_LOOP
210+
}
211+
if wfState == task.ERROR {
212+
env.setState(wfState.String())
213+
break WORKFLOW_STATE_LOOP
214+
}
215+
}
216+
}
217+
}
218+
}()
219+
183220
return
184221
}

0 commit comments

Comments
 (0)