Skip to content
Snippets Groups Projects
Commit af710e5b authored by Sun Rui's avatar Sun Rui Committed by Sean Owen
Browse files

[SPARK-16522][MESOS] Spark application throws exception on exit.

## What changes were proposed in this pull request?
Spark applications running on Mesos throw exception upon exit. For details, refer to https://issues.apache.org/jira/browse/SPARK-16522.

I am not sure if there is any better fix, so wait for review comments.

## How was this patch tested?
Manual test. Observed that the exception is gone upon application exit.

Author: Sun Rui <sunrui2016@gmail.com>

Closes #14175 from sun-rui/SPARK-16522.
parent 801e4d09
No related branches found
No related tags found
No related merge requests found
...@@ -553,7 +553,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( ...@@ -553,7 +553,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
taskId: String, taskId: String,
reason: String): Unit = { reason: String): Unit = {
stateLock.synchronized { stateLock.synchronized {
removeExecutor(taskId, SlaveLost(reason)) // Do not call removeExecutor() after this scheduler backend was stopped because
// removeExecutor() internally will send a message to the driver endpoint but
// the driver endpoint is not available now, otherwise an exception will be thrown.
if (!stopCalled) {
removeExecutor(taskId, SlaveLost(reason))
}
slaves(slaveId).taskIDs.remove(taskId) slaves(slaveId).taskIDs.remove(taskId)
} }
} }
......
...@@ -21,6 +21,7 @@ import java.util.Collections ...@@ -21,6 +21,7 @@ import java.util.Collections
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._ import org.apache.mesos.Protos._
...@@ -33,6 +34,7 @@ import org.scalatest.BeforeAndAfter ...@@ -33,6 +34,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.mesos.Utils._ import org.apache.spark.scheduler.cluster.mesos.Utils._
...@@ -47,6 +49,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite ...@@ -47,6 +49,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private var backend: MesosCoarseGrainedSchedulerBackend = _ private var backend: MesosCoarseGrainedSchedulerBackend = _
private var externalShuffleClient: MesosExternalShuffleClient = _ private var externalShuffleClient: MesosExternalShuffleClient = _
private var driverEndpoint: RpcEndpointRef = _ private var driverEndpoint: RpcEndpointRef = _
@volatile private var stopCalled = false
test("mesos supports killing and limiting executors") { test("mesos supports killing and limiting executors") {
setBackend() setBackend()
...@@ -341,6 +344,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite ...@@ -341,6 +344,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(!dockerInfo.getForcePullImage) assert(!dockerInfo.getForcePullImage)
} }
test("Do not call removeExecutor() after backend is stopped") {
setBackend()
// launches a task on a valid offer
val offers = List((backend.executorMemory(sc), 1))
offerResources(offers)
verifyTaskLaunched(driver, "o1")
// launches a thread simulating status update
val statusUpdateThread = new Thread {
override def run(): Unit = {
while (!stopCalled) {
Thread.sleep(100)
}
val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
backend.statusUpdate(driver, status)
}
}.start
backend.stop()
// Any method of the backend involving sending messages to the driver endpoint should not
// be called after the backend is stopped.
verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
}
private def verifyDeclinedOffer(driver: SchedulerDriver, private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID, offerId: OfferID,
filter: Boolean = false): Unit = { filter: Boolean = false): Unit = {
...@@ -396,6 +425,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite ...@@ -396,6 +425,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
mesosDriver = newDriver mesosDriver = newDriver
} }
override def stopExecutors(): Unit = {
stopCalled = true
}
markRegistered() markRegistered()
} }
backend.start() backend.start()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment