Skip to content

Commit 468e8ee

Browse files
committed
[core] Adapt DCS client to new DCS protofile
1 parent 119130f commit 468e8ee

1 file changed

Lines changed: 101 additions & 33 deletions

File tree

core/integration/dcs/plugin.go

Lines changed: 101 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/AliceO2Group/Control/core/integration"
4343
dcspb "github.com/AliceO2Group/Control/core/integration/dcs/protos"
4444
"github.com/AliceO2Group/Control/core/workflow/callable"
45+
"github.com/imdario/mergo"
4546
"github.com/spf13/viper"
4647
"google.golang.org/grpc"
4748
"google.golang.org/grpc/connectivity"
@@ -204,12 +205,43 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
204205
return
205206
}
206207

208+
// Preparing the per-detector request payload
207209
in := dcspb.SorRequest{
208-
Detector: detectors,
209-
RunType: rt,
210-
RunNumber: int32(runNumber64),
211-
Parameters: argMap,
210+
RunType: rt,
211+
RunNumber: int32(runNumber64),
212+
Detectors: make([]*dcspb.DetectorOperationRequest, len(detectors)),
212213
}
214+
for i, det := range detectors {
215+
perDetectorParameters, ok := varStack[strings.ToLower(det.String()) + "_dcs_sor_parameters"]
216+
if !ok {
217+
log.Debug("empty DCS detectors list provided")
218+
perDetectorParameters = "{}"
219+
}
220+
detectorArgMap := make(map[string]string)
221+
bytes := []byte(perDetectorParameters)
222+
err = json.Unmarshal(bytes, &detectorArgMap)
223+
if err != nil {
224+
log.WithError(err).
225+
WithField("detector", det.String()).
226+
Errorf("error processing DCS SOR parameters for detector %s", det.String())
227+
return
228+
}
229+
230+
// Per-detector parameters override any general dcs_sor_parameters
231+
err = mergo.Merge(&detectorArgMap, argMap)
232+
if err != nil {
233+
log.WithError(err).
234+
WithField("detector", det.String()).
235+
Errorf("error building parameter map for detector %s", det.String())
236+
return
237+
}
238+
239+
in.Detectors[i] = &dcspb.DetectorOperationRequest{
240+
Detector: det,
241+
ExtraParameters: detectorArgMap,
242+
}
243+
}
244+
213245
if p.dcsClient == nil {
214246
log.WithError(fmt.Errorf("DCS plugin not initialized")).
215247
WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
@@ -230,7 +262,7 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
230262
WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
231263
Error("failed to perform DCS SOR")
232264
}
233-
var dcsEvent *dcspb.Event
265+
var dcsEvent *dcspb.RunEvent
234266
for {
235267
dcsEvent, err = stream.Recv()
236268
if err == io.EOF {
@@ -245,20 +277,22 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
245277
log.WithError(err).Warn("bad DCS event received")
246278
break
247279
}
248-
if dcsEvent.Eventtype == dcspb.EventType_STATE_CHANGE_EVENT {
249-
if strings.Contains(dcsEvent.Parameters, "SOR_FAILURE") {
250-
log.WithField("event", dcsEvent).Warn("DCS SOR failure")
251-
return
252-
}
253-
if strings.Contains(dcsEvent.Parameters, "RUN_OK") {
254-
log.WithField("event", dcsEvent).Debug("DCS SOR success")
255-
envId, ok := varStack["environment_id"]
256-
if !ok {
257-
break
258-
}
259-
p.pendingEORs[envId] = runNumber64
280+
281+
if dcsEvent.GetState() == dcspb.DetectorState_SOR_FAILURE {
282+
log.WithField("event", dcsEvent).
283+
WithField("detector", dcsEvent.GetDetector().String()).
284+
Warn("DCS SOR failure")
285+
return
286+
}
287+
if dcsEvent.GetState() == dcspb.DetectorState_RUN_OK && dcsEvent.GetDetector() == dcspb.Detector_DCS {
288+
log.WithField("event", dcsEvent).
289+
Debug("DCS SOR success")
290+
envId, ok := varStack["environment_id"]
291+
if !ok {
260292
break
261293
}
294+
p.pendingEORs[envId] = runNumber64
295+
break
262296
}
263297
log.WithField("event", dcsEvent).Debug("incoming DCS SOR event")
264298
}
@@ -292,11 +326,42 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
292326
return
293327
}
294328

329+
// Preparing the per-detector request payload
295330
in := dcspb.EorRequest{
296-
Detector: detectors,
297-
RunNumber: int32(runNumber),
298-
Parameters: argMap,
331+
RunNumber: int32(runNumber),
332+
Detectors: make([]*dcspb.DetectorOperationRequest, len(detectors)),
299333
}
334+
for i, det := range detectors {
335+
perDetectorParameters, ok := varStack[strings.ToLower(det.String()) + "_dcs_eor_parameters"]
336+
if !ok {
337+
log.Debug("empty DCS detectors list provided")
338+
perDetectorParameters = "{}"
339+
}
340+
detectorArgMap := make(map[string]string)
341+
bytes := []byte(perDetectorParameters)
342+
err = json.Unmarshal(bytes, &detectorArgMap)
343+
if err != nil {
344+
log.WithError(err).
345+
WithField("detector", det.String()).
346+
Errorf("error processing DCS EOR parameters for detector %s", det.String())
347+
return
348+
}
349+
350+
// Per-detector parameters override any general dcs_sor_parameters
351+
err = mergo.Merge(&detectorArgMap, argMap)
352+
if err != nil {
353+
log.WithError(err).
354+
WithField("detector", det.String()).
355+
Errorf("error building parameter map for detector %s", det.String())
356+
return
357+
}
358+
359+
in.Detectors[i] = &dcspb.DetectorOperationRequest{
360+
Detector: det,
361+
ExtraParameters: detectorArgMap,
362+
}
363+
}
364+
300365
if p.dcsClient == nil {
301366
log.WithError(fmt.Errorf("DCS plugin not initialized")).
302367
WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
@@ -317,7 +382,7 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
317382
WithField("endpoint", viper.GetString("dcsServiceEndpoint")).
318383
Error("failed to perform DCS EOR")
319384
}
320-
var dcsEvent *dcspb.Event
385+
var dcsEvent *dcspb.RunEvent
321386
for {
322387
dcsEvent, err = stream.Recv()
323388
if err == io.EOF {
@@ -332,21 +397,24 @@ func (p *Plugin) ObjectStack(data interface{}) (stack map[string]interface{}) {
332397
log.WithError(err).Warn("bad DCS event received")
333398
break
334399
}
335-
if dcsEvent.Eventtype == dcspb.EventType_STATE_CHANGE_EVENT {
336-
if strings.Contains(dcsEvent.Parameters, "EOR_FAILURE") {
337-
log.WithField("event", dcsEvent).Warn("DCS EOR failure")
338-
return
339-
}
340-
if strings.Contains(dcsEvent.Parameters, "RUN_OK") {
341-
log.WithField("event", dcsEvent).Debug("DCS EOR success")
342-
envId, ok := varStack["environment_id"]
343-
if !ok {
344-
break
345-
}
346-
delete(p.pendingEORs, envId)
400+
401+
if dcsEvent.GetState() == dcspb.DetectorState_EOR_FAILURE {
402+
log.WithField("event", dcsEvent).
403+
WithField("detector", dcsEvent.GetDetector().String()).
404+
Warn("DCS EOR failure")
405+
return
406+
}
407+
if dcsEvent.GetState() == dcspb.DetectorState_RUN_OK && dcsEvent.GetDetector() == dcspb.Detector_DCS {
408+
log.WithField("event", dcsEvent).
409+
Debug("DCS EOR success")
410+
envId, ok := varStack["environment_id"]
411+
if !ok {
347412
break
348413
}
414+
delete(p.pendingEORs, envId)
415+
break
349416
}
417+
350418
log.WithField("event", dcsEvent).Debug("incoming DCS EOR event")
351419
}
352420
return

0 commit comments

Comments
 (0)