Skip to content

Commit 3cb776c

Browse files
claireguyotteo
authored andcommitted
[core] Bookkeeping plugin corrects missing SOR timestamps in case they are not set correctly.
1 parent 15ddcac commit 3cb776c

1 file changed

Lines changed: 145 additions & 7 deletions

File tree

core/integration/bookkeeping/plugin.go

Lines changed: 145 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,11 @@ type Plugin struct {
6060

6161
bookkeepingClient *RpcClient
6262

63-
pendingRunStops map[string] /*envId*/ int64
64-
pendingO2Stops map[string] /*envId*/ bool
65-
pendingTrgStops map[string] /*envId*/ bool
63+
pendingRunStops map[string] /*envId*/ int64
64+
pendingO2Starts map[string] /*envId*/ bool
65+
pendingO2Stops map[string] /*envId*/ bool
66+
pendingTrgStarts map[string] /*envId*/ bool
67+
pendingTrgStops map[string] /*envId*/ bool
6668
}
6769

6870
func NewPlugin(endpoint string) integration.Plugin {
@@ -81,7 +83,9 @@ func NewPlugin(endpoint string) integration.Plugin {
8183
bookkeepingPort: portNumber,
8284
bookkeepingClient: nil,
8385
pendingRunStops: make(map[string]int64),
86+
pendingO2Starts: make(map[string]bool),
8487
pendingO2Stops: make(map[string]bool),
88+
pendingTrgStarts: make(map[string]bool),
8589
pendingTrgStops: make(map[string]bool),
8690
}
8791
}
@@ -117,6 +121,21 @@ func (p *Plugin) pendingRunStopsForEnvs(envIds []uid.ID) map[uid.ID]string {
117121
return out
118122
}
119123

124+
func (p *Plugin) pendingO2StartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
125+
if p.pendingO2Starts == nil {
126+
return nil
127+
}
128+
129+
out := make(map[uid.ID]bool)
130+
131+
for _, envId := range envIds {
132+
if o2Start, ok := p.pendingO2Starts[envId.String()]; ok {
133+
out[envId] = o2Start
134+
}
135+
}
136+
return out
137+
}
138+
120139
func (p *Plugin) pendingO2StopsForEnvs(envIds []uid.ID) map[uid.ID]bool {
121140
if p.pendingO2Stops == nil {
122141
return nil
@@ -132,6 +151,21 @@ func (p *Plugin) pendingO2StopsForEnvs(envIds []uid.ID) map[uid.ID]bool {
132151
return out
133152
}
134153

154+
func (p *Plugin) pendingTrgStartsForEnvs(envIds []uid.ID) map[uid.ID]bool {
155+
if p.pendingTrgStarts == nil {
156+
return nil
157+
}
158+
159+
out := make(map[uid.ID]bool)
160+
161+
for _, envId := range envIds {
162+
if trgStart, ok := p.pendingTrgStarts[envId.String()]; ok {
163+
out[envId] = trgStart
164+
}
165+
}
166+
return out
167+
}
168+
135169
func (p *Plugin) pendingTrgStopsForEnvs(envIds []uid.ID) map[uid.ID]bool {
136170
if p.pendingTrgStops == nil {
137171
return nil
@@ -156,7 +190,9 @@ func (p *Plugin) GetData(_ []any) string {
156190

157191
outMap := make(map[string]interface{})
158192
outMap["pendingRunStops"] = p.pendingRunStopsForEnvs(envIds)
193+
outMap["pendingO2Starts"] = p.pendingO2StartsForEnvs(envIds)
159194
outMap["pendingO2Stops"] = p.pendingO2StopsForEnvs(envIds)
195+
outMap["pendingTrgStarts"] = p.pendingTrgStartsForEnvs(envIds)
160196
outMap["pendingTrgStops"] = p.pendingTrgStopsForEnvs(envIds)
161197

162198
out, err := json.Marshal(outMap)
@@ -172,7 +208,9 @@ func (p *Plugin) GetEnvironmentsData(envIds []uid.ID) map[uid.ID]string {
172208
}
173209

174210
inRunStopMap := p.pendingRunStopsForEnvs(envIds)
211+
inO2StartMap := p.pendingO2StartsForEnvs(envIds)
175212
inO2StopMap := p.pendingO2StopsForEnvs(envIds)
213+
inTrgStartMap := p.pendingTrgStartsForEnvs(envIds)
176214
inTrgStopMap := p.pendingTrgStopsForEnvs(envIds)
177215

178216
envMap := make(map[string]interface{})
@@ -182,9 +220,15 @@ func (p *Plugin) GetEnvironmentsData(envIds []uid.ID) map[uid.ID]string {
182220
if runStop, ok := inRunStopMap[envId]; ok {
183221
envMap["runNumber"] = runStop
184222
}
223+
if o2Start, ok := inO2StartMap[envId]; ok {
224+
envMap["pendingO2Start"] = o2Start
225+
}
185226
if o2Stop, ok := inO2StopMap[envId]; ok {
186227
envMap["pendingO2Stop"] = o2Stop
187228
}
229+
if trgStart, ok := inTrgStartMap[envId]; ok {
230+
envMap["pendingTrgStart"] = trgStart
231+
}
188232
if trgStop, ok := inTrgStopMap[envId]; ok {
189233
envMap["pendingTrgStop"] = trgStop
190234
}
@@ -416,7 +460,9 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
416460
return
417461
} else {
418462
p.pendingRunStops[envId] = runNumber64
463+
p.pendingO2Starts[envId] = true
419464
p.pendingO2Stops[envId] = true
465+
p.pendingTrgStarts[envId] = true
420466
p.pendingTrgStops[envId] = true
421467
log.WithField("run", runNumber64).
422468
WithField("partition", envId).
@@ -622,6 +668,42 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
622668
ReadoutCfgUri: &readoutUri,
623669
}
624670

671+
if function, ok := varStack["__call_func"]; ok && strings.Contains(function, "UpdateRunStop") {
672+
if timeO2StartOutput != nil && timeTrgStartOutput == nil {
673+
inRun = bkpb.RunUpdateRequest{
674+
RunNumber: int32(runNumber64),
675+
TimeO2Start: timeO2StartOutput,
676+
TimeO2End: timeO2EndOutput,
677+
TimeTrgEnd: timeTrgEndOutput,
678+
TriggerValue: &trg,
679+
PdpConfigOption: &pdpConfig,
680+
PdpTopologyDescriptionLibraryFile: &pdpTopology,
681+
TfbDdMode: &tfbMode,
682+
LhcPeriod: &lhcPeriod,
683+
OdcTopologyFullName: &odcTopologyFullname,
684+
PdpWorkflowParameters: &pdpParameters,
685+
PdpBeamType: &pdpBeam,
686+
ReadoutCfgUri: &readoutUri,
687+
}
688+
} else if timeO2StartOutput == nil && timeTrgStartOutput != nil {
689+
inRun = bkpb.RunUpdateRequest{
690+
RunNumber: int32(runNumber64),
691+
TimeO2End: timeO2EndOutput,
692+
TimeTrgStart: timeTrgStartOutput,
693+
TimeTrgEnd: timeTrgEndOutput,
694+
TriggerValue: &trg,
695+
PdpConfigOption: &pdpConfig,
696+
PdpTopologyDescriptionLibraryFile: &pdpTopology,
697+
TfbDdMode: &tfbMode,
698+
LhcPeriod: &lhcPeriod,
699+
OdcTopologyFullName: &odcTopologyFullname,
700+
PdpWorkflowParameters: &pdpParameters,
701+
PdpBeamType: &pdpBeam,
702+
ReadoutCfgUri: &readoutUri,
703+
}
704+
}
705+
}
706+
625707
timeout := callable.AcquireTimeout(BKP_RUN_TIMEOUT, varStack, "UpdateRun", envId)
626708
ctx, cancel := context.WithTimeout(context.Background(), timeout)
627709
defer cancel()
@@ -643,7 +725,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
643725
defer delete(p.pendingO2Stops, envId)
644726
defer delete(p.pendingTrgStops, envId)
645727
if p.pendingO2Stops[envId] == true || (trgEnabled && p.pendingTrgStops[envId] == true) {
646-
updatedRun = "INCOMPLETE"
728+
updatedRun = "INCOMPLETE STOP"
647729
if p.pendingO2Stops[envId] == true {
648730
timeO2EndTemp = time.Now().UnixMilli()
649731
timeO2EndOutput = &timeO2EndTemp
@@ -665,7 +747,7 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
665747
}
666748
log.WithField("run", runNumber64).
667749
WithField("partition", envId).
668-
Debug("Bookkeeping API RunServiceClient: Update call: completing missing run end time")
750+
Debug("Bookkeeping API RunServiceClient: Update call: completing missing EOR timestamps")
669751

670752
timeout = callable.AcquireTimeout(BKP_RUN_TIMEOUT, varStack, "UpdateRun", envId)
671753
ctx, cancel = context.WithTimeout(context.Background(), timeout)
@@ -685,8 +767,52 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
685767
} else {
686768
updatedRun = "STOPPED"
687769
}
688-
} else {
689-
updatedRun = "STARTED"
770+
} else if function, ok := varStack["__call_func"]; ok && strings.Contains(function, "UpdateRunStart") {
771+
defer delete(p.pendingO2Starts, envId)
772+
defer delete(p.pendingTrgStarts, envId)
773+
if p.pendingO2Starts[envId] == true || (trgEnabled && p.pendingTrgStarts[envId] == true) {
774+
updatedRun = "INCOMPLETE START"
775+
if p.pendingO2Starts[envId] == true {
776+
timeO2StartTemp = time.Now().UnixMilli()
777+
timeO2StartOutput = &timeO2StartTemp
778+
log.WithField("run", runNumber64).
779+
WithField("partition", envId).
780+
Warning("Bookkeeping API RunServiceClient: Update call: run information incomplete, missing O2 start time")
781+
}
782+
if trgEnabled && p.pendingTrgStarts[envId] == true {
783+
timeTrgStartTemp = time.Now().UnixMilli()
784+
timeTrgStartOutput = &timeO2StartTemp
785+
log.WithField("run", runNumber64).
786+
WithField("partition", envId).
787+
Warning("Bookkeeping API RunServiceClient: Update call: run information incomplete, missing Trg start time")
788+
}
789+
inRun = bkpb.RunUpdateRequest{
790+
RunNumber: int32(runNumber64),
791+
TimeO2Start: timeO2StartOutput,
792+
TimeTrgStart: timeTrgStartOutput,
793+
}
794+
log.WithField("run", runNumber64).
795+
WithField("partition", envId).
796+
Debug("Bookkeeping API RunServiceClient: Update call: completing missing SOR timestamps")
797+
798+
timeout = callable.AcquireTimeout(BKP_RUN_TIMEOUT, varStack, "UpdateRun", envId)
799+
ctx, cancel = context.WithTimeout(context.Background(), timeout)
800+
defer cancel()
801+
_, err = p.bookkeepingClient.RunServiceClient.Update(ctx, &inRun, grpc.EmptyCallOption{})
802+
if err != nil {
803+
log.WithError(err).
804+
WithField("run", runNumber64).
805+
WithField("partition", envId).
806+
WithField("call", "RunServiceClient.Update").
807+
Error("Bookkeeping API RunServiceClient: Update error")
808+
809+
call.VarStack["__call_error_reason"] = err.Error()
810+
call.VarStack["__call_error"] = callFailedStr
811+
return
812+
}
813+
} else {
814+
updatedRun = "STARTED"
815+
}
690816
}
691817
log.WithField("run", runNumber64).
692818
WithField("updated to", updatedRun).
@@ -728,7 +854,13 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
728854
}
729855

730856
O2StartTime := varStack["run_start_time_ms"]
857+
if O2StartTime != "" {
858+
p.pendingO2Starts[envId] = false
859+
}
731860
TrgStartTime := varStack["trg_start_time_ms"]
861+
if TrgStartTime != "" {
862+
p.pendingTrgStarts[envId] = false
863+
}
732864

733865
return updateRunFunc(runNumber64, "test", O2StartTime, "", TrgStartTime, "")
734866
}
@@ -770,12 +902,18 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
770902
}
771903

772904
O2StartTime := varStack["run_start_time_ms"]
905+
if O2StartTime != "" {
906+
p.pendingO2Starts[envId] = false
907+
}
773908
O2EndTime := varStack["run_end_time_ms"]
774909
if O2EndTime != "" {
775910
p.pendingO2Stops[envId] = false
776911
}
777912

778913
TrgStartTime := varStack["trg_start_time_ms"]
914+
if TrgStartTime != "" {
915+
p.pendingTrgStarts[envId] = false
916+
}
779917
TrgEndTime := varStack["trg_end_time_ms"]
780918
if TrgEndTime != "" {
781919
p.pendingTrgStops[envId] = false

0 commit comments

Comments
 (0)