Skip to content

Commit 1e130e5

Browse files
asm582openshift-merge-robot
authored andcommitted
refactor updatequeuejobs method to avoid using queues
1 parent 21c9186 commit 1e130e5

File tree

1 file changed

+76
-86
lines changed

1 file changed

+76
-86
lines changed

pkg/controller/queuejob/queuejob_controller_ex.go

Lines changed: 76 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,8 +1435,8 @@ func (cc *XController) Run(stopCh <-chan struct{}) {
14351435
// start preempt thread based on preemption of pods
14361436
go wait.Until(cc.PreemptQueueJobs, 60*time.Second, stopCh)
14371437

1438-
// This thread is used as a heartbeat to calculate runtime spec in the status
1439-
go wait.Until(cc.UpdateQueueJobs, 5*time.Second, stopCh)
1438+
// This thread is used to update AW that has completionstatus set to Complete or RunningHoldCompletion
1439+
go wait.Until(cc.UpdateQueueJobs, 10*time.Second, stopCh)
14401440

14411441
if cc.isDispatcher {
14421442
go wait.Until(cc.UpdateAgent, 2*time.Second, stopCh) // In the Agent?
@@ -1457,35 +1457,89 @@ func (qjm *XController) UpdateAgent() {
14571457
}
14581458
}
14591459

1460-
//Move AW from Running to Completed or RunningHoldCompletion
1460+
// Move AW from Running to Completed or RunningHoldCompletion
1461+
// Do not use event queues! Running AWs move to Completed, from which it will never transition to any other state.
1462+
// State transition: Running->RunningHoldCompletion->Completed
14611463
func (qjm *XController) UpdateQueueJobs() {
1462-
// retrieve queueJobs from local cache. no guarantee queueJobs contain up-to-date information
14631464
queueJobs, err := qjm.appWrapperLister.AppWrappers("").List(labels.Everything())
14641465
if err != nil {
14651466
klog.Errorf("[UpdateQueueJobs] Failed to get a list of active appwrappers, err=%+v", err)
14661467
return
14671468
}
1469+
containsCompletionStatus := false
14681470
for _, newjob := range queueJobs {
1469-
if newjob.Status.State == arbv1.AppWrapperStateActive {
1470-
klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status)
1471-
// check eventQueue, qjqueue in program sequence to make sure job is not in qjqueue
1472-
if _, exists, _ := qjm.eventQueue.Get(newjob); exists {
1473-
klog.V(6).Infof("[UpdateQueueJobs] app wrapper %s/%s found in the event queue, not adding it", newjob.Namespace, newjob.Name)
1474-
continue
1475-
} // do not enqueue if already in eventQueue
1476-
if qjm.qjqueue.IfExist(newjob) {
1477-
klog.V(6).Infof("[UpdateQueueJobs] app wrapper %s/%s found in the job queue, not adding it", newjob.Namespace, newjob.Name)
1478-
continue
1479-
} // do not enqueue if already in qjqueue
1480-
//Remove stale copy
1481-
qjm.eventQueue.Delete(newjob)
1482-
//Add fresh copy
1483-
err = qjm.enqueueIfNotPresent(newjob)
1471+
for _, item := range newjob.Spec.AggrResources.GenericItems {
1472+
if len(item.CompletionStatus) > 0 {
1473+
containsCompletionStatus = true
1474+
}
1475+
}
1476+
if (newjob.Status.State == arbv1.AppWrapperStateActive || newjob.Status.State == arbv1.AppWrapperStateRunningHoldCompletion) && containsCompletionStatus {
1477+
err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(newjob)
14841478
if err != nil {
1485-
klog.Errorf("[UpdateQueueJobs] Fail to enqueue %s to eventQueue, ignore. *Delay=%.6f seconds &qj=%p Version=%s Status=%+v err=%#v", newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob, newjob.ResourceVersion, newjob.Status, err)
1486-
} else {
1487-
klog.V(6).Infof("[UpdateQueueJobs] %s *Delay=%.6f seconds eventQueue.Add_byUpdateQueueJobs &qj=%p Version=%s Status=%+v", newjob.Name, time.Now().Sub(newjob.Status.ControllerFirstTimestamp.Time).Seconds(), newjob, newjob.ResourceVersion, newjob.Status)
1479+
klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v", newjob.Name, err)
1480+
continue
1481+
}
1482+
klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status)
1483+
// set appwrapper status to Complete or RunningHoldCompletion
1484+
derivedAwStatus := qjm.getAppWrapperCompletionStatus(newjob)
1485+
1486+
klog.Infof("[UpdateQueueJobs] Got completion status '%s' for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", derivedAwStatus, newjob.Namespace, newjob.Name, newjob.ResourceVersion,
1487+
newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed)
1488+
1489+
// Set Appwrapper state to complete if all items in Appwrapper
1490+
// are completed
1491+
if derivedAwStatus == arbv1.AppWrapperStateRunningHoldCompletion {
1492+
newjob.Status.State = derivedAwStatus
1493+
var updateQj *arbv1.AppWrapper
1494+
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondRunningHoldCompletion, "SomeItemsCompleted")
1495+
if index < 0 {
1496+
newjob.Status.QueueJobState = arbv1.AppWrapperCondRunningHoldCompletion
1497+
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "")
1498+
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
1499+
newjob.Status.FilterIgnore = true // Update AppWrapperCondRunningHoldCompletion
1500+
updateQj = newjob.DeepCopy()
1501+
} else {
1502+
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "")
1503+
newjob.Status.Conditions[index] = *cond.DeepCopy()
1504+
updateQj = newjob.DeepCopy()
1505+
}
1506+
err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setRunningHoldCompletion")
1507+
if err != nil {
1508+
//TODO: implement retry
1509+
klog.Errorf("[UpdateQueueJobs] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err)
1510+
}
1511+
}
1512+
// Set appwrapper status to complete
1513+
if derivedAwStatus == arbv1.AppWrapperStateCompleted {
1514+
newjob.Status.State = derivedAwStatus
1515+
newjob.Status.CanRun = false
1516+
var updateQj *arbv1.AppWrapper
1517+
index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondCompleted, "PodsCompleted")
1518+
if index < 0 {
1519+
newjob.Status.QueueJobState = arbv1.AppWrapperCondCompleted
1520+
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "")
1521+
newjob.Status.Conditions = append(newjob.Status.Conditions, cond)
1522+
newjob.Status.FilterIgnore = true // Update AppWrapperCondCompleted
1523+
updateQj = newjob.DeepCopy()
1524+
} else {
1525+
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "")
1526+
newjob.Status.Conditions[index] = *cond.DeepCopy()
1527+
updateQj = newjob.DeepCopy()
1528+
}
1529+
err := qjm.updateStatusInEtcdWithRetry(context.Background(), updateQj, "[UpdateQueueJobs] setCompleted")
1530+
if err != nil {
1531+
if qjm.quotaManager != nil {
1532+
qjm.quotaManager.Release(updateQj)
1533+
}
1534+
//TODO: Implement retry
1535+
klog.Errorf("[UpdateQueueJobs] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err)
1536+
}
1537+
if qjm.quotaManager != nil {
1538+
qjm.quotaManager.Release(updateQj)
1539+
}
14881540
}
1541+
klog.Infof("[UpdateQueueJobs] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", newjob.Namespace, newjob.Name, newjob.ResourceVersion,
1542+
newjob.Status.CanRun, newjob.Status.State, newjob.Status.Pending, newjob.Status.Running, newjob.Status.Succeeded, newjob.Status.Failed)
14891543
}
14901544
}
14911545
}
@@ -1864,9 +1918,6 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper,
18641918
klog.Errorf("manageQueueJob] Failed to add '%s/%s' to activeQueue. Back to eventQueue activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v err=%#v",
18651919
qj.Namespace, qj.Name, cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, err00)
18661920
cc.enqueue(qj)
1867-
} else {
1868-
klog.V(3).Infof("[manageQueueJob] Added '%s/%s' to activeQueue queue 1Delay=%.6f seconds activeQ.Add_success activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v",
1869-
qj.Namespace, qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), cc.qjqueue.IfExistActiveQ(qj), cc.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status)
18701921
}
18711922
return nil
18721923
}
@@ -1952,67 +2003,6 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper,
19522003
klog.Infof("[manageQueueJob] Getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", qj.Namespace, qj.Name, qj.ResourceVersion,
19532004
qj.Status.CanRun, qj.Status.State, qj.Status.Pending, qj.Status.Running, qj.Status.Succeeded, qj.Status.Failed)
19542005

1955-
// set appwrapper status to Complete or RunningHoldCompletion
1956-
derivedAwStatus := cc.getAppWrapperCompletionStatus(qj)
1957-
1958-
klog.Infof("[manageQueueJob] Got completion status '%s' for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", derivedAwStatus, qj.Namespace, qj.Name, qj.ResourceVersion,
1959-
qj.Status.CanRun, qj.Status.State, qj.Status.Pending, qj.Status.Running, qj.Status.Succeeded, qj.Status.Failed)
1960-
1961-
// Set Appwrapper state to complete if all items in Appwrapper
1962-
// are completed
1963-
if derivedAwStatus == arbv1.AppWrapperStateRunningHoldCompletion {
1964-
qj.Status.State = derivedAwStatus
1965-
var updateQj *arbv1.AppWrapper
1966-
index := getIndexOfMatchedCondition(qj, arbv1.AppWrapperCondRunningHoldCompletion, "SomeItemsCompleted")
1967-
if index < 0 {
1968-
qj.Status.QueueJobState = arbv1.AppWrapperCondRunningHoldCompletion
1969-
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "")
1970-
qj.Status.Conditions = append(qj.Status.Conditions, cond)
1971-
qj.Status.FilterIgnore = true // Update AppWrapperCondRunningHoldCompletion
1972-
updateQj = qj.DeepCopy()
1973-
} else {
1974-
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondRunningHoldCompletion, v1.ConditionTrue, "SomeItemsCompleted", "")
1975-
qj.Status.Conditions[index] = *cond.DeepCopy()
1976-
updateQj = qj.DeepCopy()
1977-
}
1978-
err := cc.updateStatusInEtcdWithRetry(ctx, updateQj, "[manageQueueJob] setRunningHoldCompletion")
1979-
if err != nil {
1980-
klog.Errorf("[manageQueueJob] Error updating status 'setRunningHoldCompletion' for AppWrapper: '%s/%s',Status=%+v, err=%+v.", qj.Namespace, qj.Name, qj.Status, err)
1981-
return err
1982-
}
1983-
}
1984-
// Set appwrapper status to complete
1985-
if derivedAwStatus == arbv1.AppWrapperStateCompleted {
1986-
qj.Status.State = derivedAwStatus
1987-
qj.Status.CanRun = false
1988-
var updateQj *arbv1.AppWrapper
1989-
index := getIndexOfMatchedCondition(qj, arbv1.AppWrapperCondCompleted, "PodsCompleted")
1990-
if index < 0 {
1991-
qj.Status.QueueJobState = arbv1.AppWrapperCondCompleted
1992-
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "")
1993-
qj.Status.Conditions = append(qj.Status.Conditions, cond)
1994-
qj.Status.FilterIgnore = true // Update AppWrapperCondCompleted
1995-
updateQj = qj.DeepCopy()
1996-
} else {
1997-
cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondCompleted, v1.ConditionTrue, "PodsCompleted", "")
1998-
qj.Status.Conditions[index] = *cond.DeepCopy()
1999-
updateQj = qj.DeepCopy()
2000-
}
2001-
err := cc.updateStatusInEtcdWithRetry(ctx, updateQj, "[manageQueueJob] setCompleted")
2002-
if err != nil {
2003-
if cc.quotaManager != nil {
2004-
cc.quotaManager.Release(updateQj)
2005-
}
2006-
klog.Errorf("[manageQueueJob] Error updating status 'setCompleted' AppWrapper: '%s/%s',Status=%+v, err=%+v.", qj.Namespace, qj.Name, qj.Status, err)
2007-
return err
2008-
}
2009-
if cc.quotaManager != nil {
2010-
cc.quotaManager.Release(updateQj)
2011-
}
2012-
}
2013-
klog.Infof("[manageQueueJob] Done getting completion status for app wrapper '%s/%s' Version=%s Status.CanRun=%t Status.State=%s, pod counts [Pending: %d, Running: %d, Succeded: %d, Failed %d]", qj.Namespace, qj.Name, qj.ResourceVersion,
2014-
qj.Status.CanRun, qj.Status.State, qj.Status.Pending, qj.Status.Running, qj.Status.Succeeded, qj.Status.Failed)
2015-
20162006
} else if podPhaseChanges { // Continued bug fix
20172007
// Only update etcd if AW status has changed. This can happen for periodic
20182008
// updates of pod phase counts done in caller of this function.

0 commit comments

Comments
 (0)