Skip to content

Commit cf3eda7

Browse files
committed
[executor] Make incoming MESSAGE handling concurrent
OCTRL-204 #close
1 parent 3499ce2 commit cf3eda7

1 file changed

Lines changed: 43 additions & 36 deletions

File tree

executor/handlers.go

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -134,44 +134,51 @@ func handleMessageEvent(state *internalState, data []byte) (err error) {
134134
return
135135
}
136136

137-
var cmd *executorcmd.ExecutorCommand_Transition
138-
cmd, err = state.activeTasks[taskId].UnmarshalTransition(data)
139-
if err != nil {
137+
// Unmarshal and perform transition asynchronously.
138+
// This is not thread-safe but we don't expect the core to spam
139+
// transition requests with no regard for MESSAGEs back.
140+
// If we don't do this, we get a choke point (OCTRL-204)
141+
go func() {
142+
var cmd *executorcmd.ExecutorCommand_Transition
143+
cmd, err = activeTask.UnmarshalTransition(data)
144+
if err != nil {
145+
log.WithFields(logrus.Fields{
146+
"name": cmd.Name,
147+
"message": string(data[:]),
148+
"error": err.Error(),
149+
}).
150+
Error("cannot unmarshal incoming MESSAGE")
151+
return
152+
}
153+
154+
if cmd.Event == "CONFIGURE" {
155+
log.WithFields(logrus.Fields{"map": cmd.Arguments, "taskId": taskId}).Debug("CONFIGURE pushing FairMQ properties")
156+
}
157+
158+
response := activeTask.Transition(cmd)
159+
160+
data, marshalError := json.Marshal(response)
161+
if marshalError != nil {
162+
log.WithFields(logrus.Fields{
163+
"commandName": response.GetCommandName(),
164+
"commandId": response.GetCommandId(),
165+
"error": response.Err().Error(),
166+
"marshalError": marshalError,
167+
}).
168+
Error("cannot marshal MesosCommandResponse for sending as MESSAGE")
169+
return
170+
}
171+
172+
_, _ = state.cli.Send(context.TODO(), calls.NonStreaming(calls.Message(data)))
140173
log.WithFields(logrus.Fields{
141-
"name": cmd.Name,
142-
"message": string(data[:]),
143-
"error": err.Error(),
144-
}).
145-
Error("cannot unmarshal incoming MESSAGE")
146-
return
147-
}
148-
149-
if cmd.Event == "CONFIGURE" {
150-
log.WithFields(logrus.Fields{"map": cmd.Arguments, "taskId": taskId}).Debug("CONFIGURE pushing FairMQ properties")
151-
}
152-
153-
response := state.activeTasks[taskId].Transition(cmd)
154-
155-
data, marshalError := json.Marshal(response)
156-
if marshalError != nil {
157-
log.WithFields(logrus.Fields{
158-
"commandName": response.GetCommandName(),
159-
"commandId": response.GetCommandId(),
160-
"error": response.Err().Error(),
161-
"marshalError": marshalError,
162-
}).
163-
Error("cannot marshal MesosCommandResponse for sending as MESSAGE")
164-
return marshalError
165-
}
174+
"commandName": response.GetCommandName(),
175+
"commandId": response.GetCommandId(),
176+
"error": response.Err().Error(),
177+
"state": response.CurrentState,
178+
}).
179+
Debug("response sent")
180+
}()
166181

167-
_, _ = state.cli.Send(context.TODO(), calls.NonStreaming(calls.Message(data)))
168-
log.WithFields(logrus.Fields{
169-
"commandName": response.GetCommandName(),
170-
"commandId": response.GetCommandId(),
171-
"error": response.Err().Error(),
172-
"state": response.CurrentState,
173-
}).
174-
Debug("response sent")
175182

176183
default:
177184
err = errors.New(fmt.Sprintf("unrecognized controlcommand %s", incoming.Name))

0 commit comments

Comments
 (0)