Skip to content
Snippets Groups Projects
Commit 25311c2c authored by Thomas Graves's avatar Thomas Graves
Browse files

[SPARK-3456] YarnAllocator on alpha can lose container requests to RM

Author: Thomas Graves <tgraves@apache.org>

Closes #2373 from tgravescs/SPARK-3456 and squashes the following commits:

77e9532 [Thomas Graves] [SPARK-3456] YarnAllocator on alpha can lose container requests to RM
parent af258382
No related branches found
No related tags found
No related merge requests found
......@@ -48,16 +48,17 @@ private[yarn] class YarnAllocationHandler(
private val lastResponseId = new AtomicInteger()
private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()
override protected def allocateContainers(count: Int): YarnAllocateResponse = {
override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
var resourceRequests: List[ResourceRequest] = null
logDebug("numExecutors: " + count)
logDebug("asking for additional executors: " + count + " with already pending: " + pending)
val totalNumAsk = count + pending
if (count <= 0) {
resourceRequests = List()
} else if (preferredHostToCount.isEmpty) {
logDebug("host preferences is empty")
resourceRequests = List(createResourceRequest(
AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
AllocationType.ANY, null, totalNumAsk, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
} else {
// request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
......@@ -80,7 +81,7 @@ private[yarn] class YarnAllocationHandler(
val anyContainerRequests: ResourceRequest = createResourceRequest(
AllocationType.ANY,
resource = null,
count,
totalNumAsk,
YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
......@@ -103,7 +104,7 @@ private[yarn] class YarnAllocationHandler(
req.addAllReleases(releasedContainerList)
if (count > 0) {
logInfo("Allocating %d executor containers with %d of memory each.".format(count,
logInfo("Allocating %d executor containers with %d of memory each.".format(totalNumAsk,
executorMemory + memoryOverhead))
} else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
......
......@@ -112,6 +112,9 @@ private[yarn] abstract class YarnAllocator(
def allocateResources() = {
val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
// this is needed by alpha, do it here since we add numPending right after this
val executorsPending = numPendingAllocate.get()
if (missing > 0) {
numPendingAllocate.addAndGet(missing)
logInfo("Will Allocate %d executor containers, each with %d memory".format(
......@@ -121,7 +124,7 @@ private[yarn] abstract class YarnAllocator(
logDebug("Empty allocation request ...")
}
val allocateResponse = allocateContainers(missing)
val allocateResponse = allocateContainers(missing, executorsPending)
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
......@@ -435,9 +438,10 @@ private[yarn] abstract class YarnAllocator(
*
* @param count Number of containers to allocate.
* If zero, should still contact RM (as a heartbeat).
* @param pending Number of containers pending allocate. Only used on alpha.
* @return Response to the allocation request.
*/
protected def allocateContainers(count: Int): YarnAllocateResponse
protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse
/** Called to release a previously allocated container. */
protected def releaseContainer(container: Container): Unit
......
......@@ -47,7 +47,8 @@ private[yarn] class YarnAllocationHandler(
amClient.releaseAssignedContainer(container.getId())
}
override protected def allocateContainers(count: Int): YarnAllocateResponse = {
// pending isn't used on stable as the AMRMClient handles incremental asks
override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
addResourceRequests(count)
// We have already set the container request. Poll the ResourceManager for a response.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment