@@ -1457,52 +1457,35 @@ func (qjm *XController) UpdateAgent() {
14571457 }
14581458}
14591459
1460+ //Move AW from Running to Completed or RunningHoldCompletion
14601461func (qjm * XController ) UpdateQueueJobs () {
1461- firstTime := metav1 .NowMicro ()
14621462 // retrieve queueJobs from local cache. no guarantee queueJobs contain up-to-date information
14631463 queueJobs , err := qjm .appWrapperLister .AppWrappers ("" ).List (labels .Everything ())
14641464 if err != nil {
14651465 klog .Errorf ("[UpdateQueueJobs] Failed to get a list of active appwrappers, err=%+v" , err )
14661466 return
14671467 }
14681468 for _ , newjob := range queueJobs {
1469- // UpdateQueueJobs can be the first to see a new AppWrapper job, under heavy load
1470- if newjob .Status .QueueJobState == "" {
1471- newjob .Status .ControllerFirstTimestamp = firstTime
1472- newjob .Status .SystemPriority = float64 (newjob .Spec .Priority )
1473- newjob .Status .QueueJobState = arbv1 .AppWrapperCondInit
1474- newjob .Status .Conditions = []arbv1.AppWrapperCondition {
1475- {
1476- Type : arbv1 .AppWrapperCondInit ,
1477- Status : v1 .ConditionTrue ,
1478- LastUpdateMicroTime : metav1 .NowMicro (),
1479- LastTransitionMicroTime : metav1 .NowMicro (),
1480- },
1481- }
1482- klog .V (6 ).Infof ("[UpdateQueueJobs] Found new appwraper '%s/%s' 0Delay=%.6f seconds CreationTimestamp=%s ControllerFirstTimestamp=%s" ,
1483- newjob .Namespace , newjob .Name , time .Now ().Sub (newjob .Status .ControllerFirstTimestamp .Time ).Seconds (), newjob .CreationTimestamp , newjob .Status .ControllerFirstTimestamp )
1484- }
1485- // only set if appwrapper is running and dispatch time is not set previously
1486- if newjob .Status .QueueJobState == "Running" && newjob .Status .ControllerFirstDispatchTimestamp .String () == "0001-01-01 00:00:00 +0000 UTC" {
1487- newjob .Status .ControllerFirstDispatchTimestamp = firstTime
1488- }
1489-
1490- klog .V (6 ).Infof ("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v" , newjob .Name , qjm .qjqueue .IfExist (newjob ), newjob , newjob .ResourceVersion , newjob .Status )
1491- // check eventQueue, qjqueue in program sequence to make sure job is not in qjqueue
1492- if _ , exists , _ := qjm .eventQueue .Get (newjob ); exists {
1493- klog .V (6 ).Infof ("[UpdateQueueJobs] app wrapper %s/%s found in the event queue, not adding it" , newjob .Namespace , newjob .Name )
1494- continue
1495- } // do not enqueue if already in eventQueue
1496- if qjm .qjqueue .IfExist (newjob ) {
1497- klog .V (6 ).Infof ("[UpdateQueueJobs] app wrapper %s/%s found in the job queue, not adding it" , newjob .Namespace , newjob .Name )
1498- continue
1499- } // do not enqueue if already in qjqueue
1500-
1501- err = qjm .enqueueIfNotPresent (newjob )
1502- if err != nil {
1503- klog .Errorf ("[UpdateQueueJobs] Fail to enqueue %s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v" , newjob .Name , time .Now ().Sub (newjob .Status .ControllerFirstTimestamp .Time ).Seconds (), newjob , newjob .ResourceVersion , newjob .Status , err )
1504- } else {
1505- klog .V (6 ).Infof ("[UpdateQueueJobs] %s *Delay=%.6f seconds eventQueue.Add_byUpdateQueueJobs &qj=%p Version=%s Status=%+v" , newjob .Name , time .Now ().Sub (newjob .Status .ControllerFirstTimestamp .Time ).Seconds (), newjob , newjob .ResourceVersion , newjob .Status )
1469+ if newjob .Status .State == arbv1 .AppWrapperStateActive && newjob .GetDeletionTimestamp () != nil {
1470+ klog .V (6 ).Infof ("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v" , newjob .Name , qjm .qjqueue .IfExist (newjob ), newjob , newjob .ResourceVersion , newjob .Status )
1471+ // check eventQueue, qjqueue in program sequence to make sure job is not in qjqueue
1472+ if _ , exists , _ := qjm .eventQueue .Get (newjob ); exists {
1473+ klog .V (6 ).Infof ("[UpdateQueueJobs] app wrapper %s/%s found in the event queue, not adding it" , newjob .Namespace , newjob .Name )
1474+ continue
1475+ } // do not enqueue if already in eventQueue
1476+ if qjm .qjqueue .IfExist (newjob ) {
1477+ klog .V (6 ).Infof ("[UpdateQueueJobs] app wrapper %s/%s found in the job queue, not adding it" , newjob .Namespace , newjob .Name )
1478+ continue
1479+ } // do not enqueue if already in qjqueue
1480+ //Remove stale copy
1481+ qjm .eventQueue .Delete (newjob )
1482+ //Add fresh copy
1483+ err = qjm .enqueueIfNotPresent (newjob )
1484+ if err != nil {
1485+ klog .Errorf ("[UpdateQueueJobs] Fail to enqueue %s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v" , newjob .Name , time .Now ().Sub (newjob .Status .ControllerFirstTimestamp .Time ).Seconds (), newjob , newjob .ResourceVersion , newjob .Status , err )
1486+ } else {
1487+ klog .V (6 ).Infof ("[UpdateQueueJobs] %s *Delay=%.6f seconds eventQueue.Add_byUpdateQueueJobs &qj=%p Version=%s Status=%+v" , newjob .Name , time .Now ().Sub (newjob .Status .ControllerFirstTimestamp .Time ).Seconds (), newjob , newjob .ResourceVersion , newjob .Status )
1488+ }
15061489 }
15071490 }
15081491}
0 commit comments