Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 100 additions & 95 deletions cuebot/src/main/java/com/imageworks/spcue/VirtualProc.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,37 @@ public String getName() {
* on memory usage.
*
* Fast mode books all the idle cores on the the host at one time.
*
* @param host
* @param frame
* @return
*/
public static final VirtualProc build(DispatchHost host, DispatchFrame frame,
String... selfishServices) {
VirtualProc proc = seedProc(host, frame);

// Give away all existing stranded cores to drift host back to a balanced state
// Read "docs > User Guides > Stranded Cores in Cuebot" for more info
if (host.strandedCores > 0) {
proc.coresReserved += host.strandedCores;
}
proc.canHandleNegativeCoresRequest = host.canHandleNegativeCoresRequest(proc.coresReserved);

int requested = proc.coresReserved;
if (requested == 0) {
logger.debug("Reserving all cores");
proc.coresReserved = host.cores;
} else if (requested < 0) {
logger.debug("Reserving all cores minus " + requested);
proc.coresReserved = host.cores + requested;
} else if (requested >= 100) {
proc.coresReserved =
computeMultiCoreReservation(host, frame, requested, selfishServices);
}

if (!frame.threadable && proc.coresReserved > 100) {
proc.coresReserved = 100;
}
return proc;
}

private static VirtualProc seedProc(DispatchHost host, DispatchFrame frame) {
VirtualProc proc = new VirtualProc();
proc.allocationId = host.getAllocationId();
proc.hostId = host.getHostId();
Expand All @@ -101,108 +125,89 @@ public static final VirtualProc build(DispatchHost host, DispatchFrame frame,
proc.memoryReserved = frame.getMinMemory();
proc.gpusReserved = frame.minGpus;
proc.gpuMemoryReserved = frame.minGpuMemory;
return proc;
}

/*
* Frames that are announcing cores less than 100 are not multi-threaded so there is no
* reason for the frame to span more than a single core.
*
* If we are in "fast mode", we just book all the cores If the host is nimby, desktops are
* automatically fast mode.
*/

if (host.strandedCores > 0) {
proc.coresReserved = proc.coresReserved + host.strandedCores;
}

proc.canHandleNegativeCoresRequest = host.canHandleNegativeCoresRequest(proc.coresReserved);
private static int computeMultiCoreReservation(DispatchHost host, DispatchFrame frame,
int requested, String[] selfishServices) {
int wholeCores = wholeIdleCores(host);
requireWholeCore(wholeCores, frame);

if (proc.coresReserved == 0) {
logger.debug("Reserving all cores");
proc.coresReserved = host.cores;
} else if (proc.coresReserved < 0) {
logger.debug("Reserving all cores minus " + proc.coresReserved);
proc.coresReserved = host.cores + proc.coresReserved;
} else if (proc.coresReserved >= 100) {

int originalCores = proc.coresReserved;

/*
* wholeCores could be 0 if we have a fraction of a core, we can just throw a re
*/
int wholeCores = (int) (Math.floor(host.idleCores / 100.0));
if (wholeCores == 0) {
throw new EntityException("The host had only a fraction of a core remaining "
+ "but the frame required " + frame.minCores);
}
int cores = baseReservationByMode(host, frame, wholeCores, requested, selfishServices);
cores = enforceVariableFloor(host, frame, cores);
cores = applySanityBounds(cores, requested, frame.maxCores);
cores = clampToIdle(host, frame, cores, wholeCores);
return cores;
}

// if (host.threadMode == ThreadMode.Variable.value() &&
// CueUtil.isDayTime()) {
if (host.threadMode == ThreadMode.ALL_VALUE) {
proc.coresReserved = wholeCores * 100;
} else {
if (frame.threadable) {
if (selfishServices != null && frame.services != null
&& containsSelfishService(frame.services.split(","), selfishServices)) {
proc.coresReserved = wholeCores * 100;
} else {
if (host.idleMemory
- frame.getMinMemory() <= Dispatcher.MEM_STRANDED_THRESHHOLD) {
proc.coresReserved = wholeCores * 100;
} else {
proc.coresReserved =
getCoreSpan(host, frame.getMinMemory(), frame.maxCores);
}
}
if (host.threadMode == ThreadMode.VARIABLE_VALUE && proc.coresReserved <= 200) {
proc.coresReserved = 200;
if (proc.coresReserved > host.idleCores) {
// Do not allow threadable frame running on 1 core.
throw new JobDispatchException(
"Do not allow threadable frame running one core on a ThreadMode.Variable host.");
}
}
}
}
private static int wholeIdleCores(DispatchHost host) {
return (int) Math.floor(host.idleCores / 100.0);
}

/*
* Sanity checks to ensure coreUnits are not to high or to low.
*/
if (proc.coresReserved < 100) {
proc.coresReserved = 100;
}
private static void requireWholeCore(int wholeCores, DispatchFrame frame) {
if (wholeCores == 0) {
throw new EntityException("The host had only a fraction of a core remaining "
+ "but the frame required " + frame.minCores);
}
}

/*
* If the core value is changed it can never fall below the original.
*/
if (proc.coresReserved < originalCores) {
proc.coresReserved = originalCores;
}
private static int baseReservationByMode(DispatchHost host, DispatchFrame frame, int wholeCores,
int requested, String[] selfishServices) {
if (host.threadMode == ThreadMode.ALL_VALUE) {
return wholeCores * 100;
}
if (!frame.threadable) {
return requested;
}
if (isSelfishService(frame, selfishServices)) {
return wholeCores * 100;
}
if (host.idleMemory - frame.getMinMemory() <= Dispatcher.MEM_STRANDED_THRESHHOLD) {
return wholeCores * 100;
}
return getCoreSpan(host, frame.getMinMemory(), frame.maxCores);
}

/*
* Check to ensure we haven't exceeded max cores.
*/
if (frame.maxCores > 0 && proc.coresReserved > frame.maxCores) {
proc.coresReserved = frame.maxCores;
}
private static int enforceVariableFloor(DispatchHost host, DispatchFrame frame, int cores) {
if (host.threadMode != ThreadMode.VARIABLE_VALUE || !frame.threadable || cores > 200) {
return cores;
}
if (200 > host.idleCores) {
// Do not allow threadable frame running on 1 core.
throw new JobDispatchException(
"Do not allow threadable frame running one core on a ThreadMode.Variable host.");
}
return 200;
}

if (proc.coresReserved > host.idleCores) {
if (host.threadMode == ThreadMode.VARIABLE_VALUE && frame.threadable
&& wholeCores == 1) {
throw new JobDispatchException(
"Do not allow threadable frame running one core on a ThreadMode.Variable host.");
}
proc.coresReserved = wholeCores * 100;
}
private static int applySanityBounds(int cores, int originalCores, int maxCores) {
if (cores < 100) {
cores = 100;
}
if (cores < originalCores) {
cores = originalCores;
}
if (maxCores > 0 && cores > maxCores) {
cores = maxCores;
}
return cores;
}

/*
* Don't thread non-threadable layers, no matter what people put for the number of cores.
*/
if (!frame.threadable && proc.coresReserved > 100) {
proc.coresReserved = 100;
private static int clampToIdle(DispatchHost host, DispatchFrame frame, int cores,
int wholeCores) {
if (cores <= host.idleCores) {
return cores;
}
if (host.threadMode == ThreadMode.VARIABLE_VALUE && frame.threadable && wholeCores == 1) {
throw new JobDispatchException(
"Do not allow threadable frame running one core on a ThreadMode.Variable host.");
}
return wholeCores * 100;
}

return proc;
private static boolean isSelfishService(DispatchFrame frame, String[] selfishServices) {
return selfishServices != null && frame.services != null
&& containsSelfishService(frame.services.split(","), selfishServices);
}

private static final boolean containsSelfishService(String[] frameServices,
Expand Down
Loading
Loading