Skip to content
Snippets Groups Projects
Commit 63f642ae authored by Ryan Blue's avatar Ryan Blue Committed by Marcelo Vanzin
Browse files

[SPARK-13779][YARN] Avoid cancelling non-local container requests.

To maximize locality, the YarnAllocator would cancel any requests with a
stale locality preference or no locality preference. This assumed that
the majority of tasks had locality preferences, but may not be the case
when scanning S3. This caused container requests for S3 tasks to be
constantly cancelled and resubmitted.

This changes the allocator's logic to cancel only stale requests and
just enough requests without locality preferences to submit requests
with locality preferences. This avoids cancelling requests without
locality preferences that would be resubmitted without locality
preferences.

We've deployed this patch on our clusters and verified that jobs that couldn't get executors because they kept canceling and resubmitting requests are fixed. Large jobs are running fine.

Author: Ryan Blue <blue@apache.org>

Closes #11612 from rdblue/SPARK-13779-fix-yarn-allocator-requests.
parent 45f8053b
No related branches found
No related tags found
No related merge requests found
......@@ -265,25 +265,52 @@ private[yarn] class YarnAllocator(
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
val (localityMatched, localityUnMatched, localityFree) = splitPendingAllocationsByLocality(
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)
// Remove the outdated container request and recalculate the requested container number
localityUnMatched.foreach(amClient.removeContainerRequest)
localityFree.foreach(amClient.removeContainerRequest)
val updatedNumContainer = missing + localityUnMatched.size + localityFree.size
// cancel "stale" requests for locations that are no longer needed
staleRequests.foreach { stale =>
amClient.removeContainerRequest(stale)
}
val cancelledContainers = staleRequests.size
logInfo(s"Canceled $cancelledContainers container requests (locality no longer needed)")
// consider the number of new containers and cancelled stale containers available
val availableContainers = missing + cancelledContainers
// to maximize locality, include requests with no locality preference that can be cancelled
val potentialContainers = availableContainers + anyHostRequests.size
val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts,
allocatedHostToContainersMap, localityMatched)
potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
allocatedHostToContainersMap, localRequests)
val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
containerLocalityPreferences.foreach {
case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
newLocalityRequests.append(createContainerRequest(resource, nodes, racks))
case _ =>
}
for (locality <- containerLocalityPreferences) {
val request = createContainerRequest(resource, locality.nodes, locality.racks)
if (availableContainers >= newLocalityRequests.size) {
// more containers are available than needed for locality, fill in requests for any host
for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
newLocalityRequests.append(createContainerRequest(resource, null, null))
}
} else {
val numToCancel = newLocalityRequests.size - availableContainers
// cancel some requests without locality preferences to schedule more local containers
anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
amClient.removeContainerRequest(nonLocal)
}
logInfo(s"Canceled $numToCancel container requests for any host to resubmit with locality")
}
newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
val nodes = request.getNodes
val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.asScala.last
logInfo(s"Container request (host: $hostStr, capability: $resource)")
logInfo(s"Submitted container request (host: ${hostStr(request)}, capability: $resource)")
}
} else if (missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor containers")
......@@ -298,6 +325,13 @@ private[yarn] class YarnAllocator(
}
}
private def hostStr(request: ContainerRequest): String = {
Option(request.getNodes) match {
case Some(nodes) => nodes.asScala.mkString(",")
case None => "Any"
}
}
/**
* Creates a container request, handling the reflection required to use YARN features that were
* added in recent versions.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment