Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,9 @@ public Optional<BestFit> findBestFit(TaskExecutorBatchAssignmentRequest request)
}

if (noResourcesAvailable) {
log.warn("Not all scheduling constraints had enough workers available to fulfill the request {}", request);
log.warn("Not all scheduling constraints had enough workers for jobId={}, cluster={}, constraints={}",
request.getJobId(), request.getClusterID(),
request.getGroupedBySchedulingConstraints().keySet());
return Optional.empty();
} else {
// Return best fit only if there are enough available TEs for all scheduling constraints
Expand All @@ -393,28 +395,41 @@ private Optional<Map<TaskExecutorID, TaskExecutorState>> findBestFitFor(TaskExec
return Optional.empty();
}

Stream<TaskExecutorHolder> availableTEs = this.executorsByGroup.get(bestFitTeGroupKey.get())
NavigableSet<TaskExecutorHolder> groupHolders = this.executorsByGroup.get(bestFitTeGroupKey.get());

List<TaskExecutorHolder> availableTEList = groupHolders
.descendingSet()
.stream()
.filter(teHolder -> {
if (!this.taskExecutorStateMap.containsKey(teHolder.getId())) {
log.debug("findBestFitFor: TE {} excluded - not in stateMap", teHolder.getId());
return false;
}
if (currentBestFit.contains(teHolder.getId())) {
log.debug("findBestFitFor: TE {} excluded - already in bestFit", teHolder.getId());
return false;
}
TaskExecutorState st = this.taskExecutorStateMap.get(teHolder.getId());
return st.isAvailable() &&
// when a TE is returned from here to be used for scheduling, its state remain active until
// the scheduler trigger another message to update (lock) the state. However when large number
// of the requests are active at the same time on same sku, the gap between here and the message
// to lock the state can be large so another schedule request message can be in between and
// got the same set of TEs. To avoid this, a lease is added to each TE state to temporarily
// lock the TE to be used again. Since this is only lock between actor messages and lease
// duration can be short.
st.getLastSchedulerLeasedDuration().compareTo(this.schedulerLeaseExpirationDuration) > 0 &&
st.getRegistration() != null;
});
if (!st.isAvailable()) {
log.debug("findBestFitFor: TE {} excluded - not available", teHolder.getId());
return false;
}
if (st.getLastSchedulerLeasedDuration().compareTo(this.schedulerLeaseExpirationDuration) <= 0) {
log.debug("findBestFitFor: TE {} excluded - lease not expired ({}ms)", teHolder.getId(), st.getLastSchedulerLeasedDuration().toMillis());
return false;
}
if (st.getRegistration() == null) {
log.debug("findBestFitFor: TE {} excluded - no registration", teHolder.getId());
return false;
}
return true;
})
.collect(Collectors.toList());
Comment thread
hellolittlej marked this conversation as resolved.

log.info("findBestFitFor: group={}, holdersInGroup={}, availableAfterFilter={}, requested={}",
bestFitTeGroupKey.get(), groupHolders.size(), availableTEList.size(), numWorkers);

Stream<TaskExecutorHolder> availableTEs = availableTEList.stream();

if(availableTaskExecutorMutatorHook != null) {
availableTEs = availableTaskExecutorMutatorHook.mutate(availableTEs, request, schedulingConstraints);
Expand Down Expand Up @@ -642,6 +657,7 @@ private Map<String, Integer> mapReservationsToSku(

Map<String, Integer> reservationCountBySku = new HashMap<>();

log.info("mapReservationsToSku called with {} reservations", pendingReservations == null ? 0 : pendingReservations.size());
if (pendingReservations == null || pendingReservations.isEmpty()) {
return reservationCountBySku;
}
Expand Down Expand Up @@ -783,14 +799,12 @@ private Optional<Map<TaskExecutorID, TaskExecutorState>> findTaskExecutorsFor(Ta
if (taskExecutors.isPresent() && taskExecutors.get().size() == allocationRequests.size()) {
return taskExecutors;
} else {
log.warn("Not enough available TEs found for scheduling constraints {}, request: {}", schedulingConstraints, request);
if (taskExecutors.isPresent()) {
log.debug("Found {} Task Executors: {} for request: {} with constraints: {}",
taskExecutors.get().size(), taskExecutors.get(), request, schedulingConstraints);
} else {
log.warn("No suitable Task Executors found for request: {} with constraints: {}",
request, schedulingConstraints);
}
log.warn("Not enough available TEs for constraints {} (found={}, needed={}, jobId={}, workerId={})",
schedulingConstraints,
taskExecutors.isPresent() ? taskExecutors.get().size() : 0,
allocationRequests.size(),
request.getJobId(),
allocationRequests.stream().map(a -> a.getWorkerId().toString()).collect(Collectors.joining(",")));

// If there are not enough workers with the given spec then add the request the pending ones
if (!isJobIdAlreadyPending && request.getAllocationRequests().size() > 2) {
Expand Down
Loading