Skip to content
Snippets Groups Projects
Commit 2c04c8a1 authored by Josh Rosen's avatar Josh Rosen Committed by Patrick Wendell
Browse files

[SPARK-7563] OutputCommitCoordinator.stop() should only run on the driver

This fixes a bug where an executor that exits can cause the driver's OutputCommitCoordinator to stop. To fix this, we use an `isDriver` flag and check it in `stop()`.

See https://issues.apache.org/jira/browse/SPARK-7563 for more details.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #6197 from JoshRosen/SPARK-7563 and squashes the following commits:

04b2cc5 [Josh Rosen] [SPARK-7563] OutputCommitCoordinator.stop() should only be executed on the driver
parent e7454564
No related branches found
No related tags found
No related merge requests found
...@@ -379,7 +379,7 @@ object SparkEnv extends Logging { ...@@ -379,7 +379,7 @@ object SparkEnv extends Logging {
} }
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf) new OutputCommitCoordinator(conf, isDriver)
} }
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator", val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator)) new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
......
...@@ -38,7 +38,7 @@ private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttem ...@@ -38,7 +38,7 @@ private case class AskPermissionToCommitOutput(stage: Int, task: Long, taskAttem
* This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests) * This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests)
* for an extensive design discussion. * for an extensive design discussion.
*/ */
private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) extends Logging {
// Initialized by SparkEnv // Initialized by SparkEnv
var coordinatorRef: Option[RpcEndpointRef] = None var coordinatorRef: Option[RpcEndpointRef] = None
...@@ -129,9 +129,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { ...@@ -129,9 +129,11 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
} }
def stop(): Unit = synchronized { def stop(): Unit = synchronized {
coordinatorRef.foreach(_ send StopCoordinator) if (isDriver) {
coordinatorRef = None coordinatorRef.foreach(_ send StopCoordinator)
authorizedCommittersByStage.clear() coordinatorRef = None
authorizedCommittersByStage.clear()
}
} }
// Marked private[scheduler] instead of private so this can be mocked in tests // Marked private[scheduler] instead of private so this can be mocked in tests
......
...@@ -81,7 +81,7 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter { ...@@ -81,7 +81,7 @@ class OutputCommitCoordinatorSuite extends FunSuite with BeforeAndAfter {
conf: SparkConf, conf: SparkConf,
isLocal: Boolean, isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = { listenerBus: LiveListenerBus): SparkEnv = {
outputCommitCoordinator = spy(new OutputCommitCoordinator(conf)) outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true))
// Use Mockito.spy() to maintain the default infrastructure everywhere else. // Use Mockito.spy() to maintain the default infrastructure everywhere else.
// This mocking allows us to control the coordinator responses in test cases. // This mocking allows us to control the coordinator responses in test cases.
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, Some(outputCommitCoordinator)) SparkEnv.createDriverEnv(conf, isLocal, listenerBus, Some(outputCommitCoordinator))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment