diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b2cf022baf29f28b6f87782afaf5d702cb27ae7b..c11f1db0064fd2486057edbfc39379b86b0f3f2e 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -419,7 +419,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { val executorId = blockManagerAdded.blockManagerId.executorId - if (executorId != "<driver>") { + if (executorId != SparkContext.DRIVER_IDENTIFIER) { allocationManager.onExecutorAdded(executorId) } } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 73668e83bbb1d2ec87825911bed4cb44f7f97c9c..6bfcd8ceae3c138fc0e84fc8590469974ef476da 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1333,6 +1333,8 @@ object SparkContext extends Logging { private[spark] val SPARK_UNKNOWN_USER = "<unknown>" + private[spark] val DRIVER_IDENTIFIER = "<driver>" + implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double) = 0.0 diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 6a6dfda363974ae52dd98452410922d1e0f2adc9..557d2f51283ae2537f14b12746e58a4568a9bd84 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -156,7 +156,7 @@ object SparkEnv extends Logging { assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!") val hostname = conf.get("spark.driver.host") val port = conf.get("spark.driver.port").toInt - create(conf, "<driver>", hostname, port, true, isLocal, listenerBus) + create(conf, SparkContext.DRIVER_IDENTIFIER, hostname, port, true, isLocal, listenerBus) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 58b78f041cd852fb0fdd3436b8e7977c853e25f5..c0264836de7382c8f83eb0440073d622e17e4737 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import akka.actor.{Actor, ActorRef, Props} -import org.apache.spark.{Logging, SparkEnv, TaskState} +import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer} @@ -47,7 +47,7 @@ private[spark] class LocalActor( private var freeCores = totalCores - private val localExecutorId = "localhost" + private val localExecutorId = SparkContext.DRIVER_IDENTIFIER private val localExecutorHostname = "localhost" val executor = new Executor( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 259f423c73e6b2732de42ffdaacdfd2af850cc60..b177a59c721df1797bc05809f5f5969f31ee323f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} import java.util.concurrent.ConcurrentHashMap +import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils @@ -59,7 +60,7 @@ class BlockManagerId private ( def port: Int = port_ - def isDriver: Boolean = (executorId == "<driver>") + def isDriver: Boolean = { executorId == SparkContext.DRIVER_IDENTIFIER } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { out.writeUTF(executorId_) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index d9066f766476e60e31e01b15b6c3b4ca27f47c3e..def49e80a3605003b356906c2339483019e06fa2 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import scala.collection.mutable +import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ @@ -59,10 +60,9 @@ class StorageStatusListener extends SparkListener { val info = taskEnd.taskInfo val metrics = taskEnd.taskMetrics if (info != null && metrics != null) { - val execId = formatExecutorId(info.executorId) val updatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) if (updatedBlocks.length > 0) { - updateStorageStatus(execId, updatedBlocks) + updateStorageStatus(info.executorId, updatedBlocks) } } } @@ -88,13 +88,4 @@ class StorageStatusListener extends SparkListener { } } - /** - * In the local mode, there is a discrepancy between the executor ID according to the - * task ("localhost") and that according to SparkEnv ("<driver>"). In the UI, this - * results in duplicate rows for the same executor. Thus, in this mode, we aggregate - * these two rows and use the executor ID of "<driver>" to be consistent. - */ - def formatExecutorId(execId: String): String = { - if (execId == "localhost") "<driver>" else execId - } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 689cf02b25b70a4b093be13867a554ccb9eafeb2..9e0e71a51a408f04656b7aad88b3148aab88ff3a 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -48,14 +48,14 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp def storageStatusList = storageStatusListener.storageStatusList override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { - val eid = formatExecutorId(taskStart.taskInfo.executorId) + val eid = taskStart.taskInfo.executorId executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val info = taskEnd.taskInfo if (info != null) { - val eid = formatExecutorId(info.executorId) + val eid = info.executorId executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1 executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration taskEnd.reason match { @@ -84,6 +84,4 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp } } - // This addresses executor ID inconsistencies in the local mode - private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 1f1d53a1ee3b06d1825cc86278690d8ded6816c6..c6d71055920964fd6e2f872d6ab30009105a0491 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -27,7 +27,7 @@ import org.mockito.Mockito.{mock, when} import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ -import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} +import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SecurityManager} import org.apache.spark.network.BlockTransferService import org.apache.spark.network.nio.NioBlockTransferService import org.apache.spark.scheduler.LiveListenerBus @@ -57,7 +57,9 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd // Implicitly convert strings to BlockIds for test clarity. implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) - private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = { + private def makeBlockManager( + maxMem: Long, + name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val transfer = new NioBlockTransferService(conf, securityMgr) val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf, mapOutputTracker, shuffleManager, transfer) @@ -108,7 +110,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd storeIds.filterNot { _ == stores(2).blockManagerId }) // Add driver store and test whether it is filtered out - val driverStore = makeBlockManager(1000, "<driver>") + val driverStore = makeBlockManager(1000, SparkContext.DRIVER_IDENTIFIER) assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver)) assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver)) assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver)) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 9d96202a3e7acddce99d0959dd1b1ba9c9138c04..715b740b857b26aedc9910ec7f008f30d1d29e84 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -37,7 +37,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite, Matchers, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ -import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} +import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SecurityManager} import org.apache.spark.executor.DataReadMethod import org.apache.spark.network.nio.NioBlockTransferService import org.apache.spark.scheduler.LiveListenerBus @@ -69,7 +69,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId) - private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = { + private def makeBlockManager( + maxMem: Long, + name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val transfer = new NioBlockTransferService(conf, securityMgr) new BlockManager(name, actorSystem, master, serializer, maxMem, conf, mapOutputTracker, shuffleManager, transfer) @@ -790,8 +792,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter test("block store put failure") { // Use Java serializer so we can create an unserializable error. val transfer = new NioBlockTransferService(conf, securityMgr) - store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer(conf), 1200, conf, - mapOutputTracker, shuffleManager, transfer) + store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master, + new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer) // The put should fail since a1 is not serializable. class UnserializableClass