From 23b5da14ed6413b0dcb4c0bfdb80c98c433f3c9d Mon Sep 17 00:00:00 2001 From: Reynold Xin <reynoldx@gmail.com> Date: Mon, 29 Jul 2013 17:42:05 -0700 Subject: [PATCH] Moved block manager messages into BlockManagerMessages object. --- .../spark/storage/BlockManagerMaster.scala | 1 + .../storage/BlockManagerMasterActor.scala | 2 + .../spark/storage/BlockManagerMessages.scala | 163 ++++++++---------- .../storage/BlockManagerSlaveActor.scala | 2 +- .../spark/storage/BlockManagerSource.scala | 2 +- .../scala/spark/storage/BlockMessage.scala | 1 - .../spark/storage/BlockMessageArray.scala | 5 +- .../spark/storage/BlockObjectWriter.scala | 2 - 8 files changed, 82 insertions(+), 96 deletions(-) diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 3186f7c85b..76128e8cff 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -23,6 +23,7 @@ import akka.pattern.ask import akka.util.Duration import spark.{Logging, SparkException} +import spark.storage.BlockManagerMessages._ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging { diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index 244000d952..011bb6b83d 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -29,6 +29,8 @@ import akka.util.Duration import akka.util.duration._ import spark.{Logging, Utils, SparkException} +import spark.storage.BlockManagerMessages._ + /** * BlockManagerMasterActor is an actor on the master node to track statuses of diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala index 01de4ccb8f..9375a9ca54 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala @@ -22,102 +22,89 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import akka.actor.ActorRef -////////////////////////////////////////////////////////////////////////////////// -// Messages from the master to slaves. -////////////////////////////////////////////////////////////////////////////////// -private[spark] -sealed trait ToBlockManagerSlave - -// Remove a block from the slaves that have it. This can only be used to remove -// blocks that the master knows about. -private[spark] -case class RemoveBlock(blockId: String) extends ToBlockManagerSlave - -// Remove all blocks belonging to a specific RDD. -private[spark] case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave - - -////////////////////////////////////////////////////////////////////////////////// -// Messages from slaves to the master. -////////////////////////////////////////////////////////////////////////////////// -private[spark] -sealed trait ToBlockManagerMaster - -private[spark] -case class RegisterBlockManager( - blockManagerId: BlockManagerId, - maxMemSize: Long, - sender: ActorRef) - extends ToBlockManagerMaster - -private[spark] -case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster - -private[spark] -class UpdateBlockInfo( - var blockManagerId: BlockManagerId, - var blockId: String, - var storageLevel: StorageLevel, - var memSize: Long, - var diskSize: Long) - extends ToBlockManagerMaster - with Externalizable { - - def this() = this(null, null, null, 0, 0) // For deserialization only - - override def writeExternal(out: ObjectOutput) { - blockManagerId.writeExternal(out) - out.writeUTF(blockId) - storageLevel.writeExternal(out) - out.writeLong(memSize) - out.writeLong(diskSize) +private[storage] object BlockManagerMessages { + ////////////////////////////////////////////////////////////////////////////////// + // Messages from the master to slaves. + ////////////////////////////////////////////////////////////////////////////////// + sealed trait ToBlockManagerSlave + + // Remove a block from the slaves that have it. This can only be used to remove + // blocks that the master knows about. + case class RemoveBlock(blockId: String) extends ToBlockManagerSlave + + // Remove all blocks belonging to a specific RDD. + case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave + + + ////////////////////////////////////////////////////////////////////////////////// + // Messages from slaves to the master. + ////////////////////////////////////////////////////////////////////////////////// + sealed trait ToBlockManagerMaster + + case class RegisterBlockManager( + blockManagerId: BlockManagerId, + maxMemSize: Long, + sender: ActorRef) + extends ToBlockManagerMaster + + case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster + + class UpdateBlockInfo( + var blockManagerId: BlockManagerId, + var blockId: String, + var storageLevel: StorageLevel, + var memSize: Long, + var diskSize: Long) + extends ToBlockManagerMaster + with Externalizable { + + def this() = this(null, null, null, 0, 0) // For deserialization only + + override def writeExternal(out: ObjectOutput) { + blockManagerId.writeExternal(out) + out.writeUTF(blockId) + storageLevel.writeExternal(out) + out.writeLong(memSize) + out.writeLong(diskSize) + } + + override def readExternal(in: ObjectInput) { + blockManagerId = BlockManagerId(in) + blockId = in.readUTF() + storageLevel = StorageLevel(in) + memSize = in.readLong() + diskSize = in.readLong() + } } - override def readExternal(in: ObjectInput) { - blockManagerId = BlockManagerId(in) - blockId = in.readUTF() - storageLevel = StorageLevel(in) - memSize = in.readLong() - diskSize = in.readLong() + object UpdateBlockInfo { + def apply(blockManagerId: BlockManagerId, + blockId: String, + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long): UpdateBlockInfo = { + new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize) + } + + // For pattern-matching + def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { + Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) + } } -} -private[spark] -object UpdateBlockInfo { - def apply(blockManagerId: BlockManagerId, - blockId: String, - storageLevel: StorageLevel, - memSize: Long, - diskSize: Long): UpdateBlockInfo = { - new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize) - } + case class GetLocations(blockId: String) extends ToBlockManagerMaster - // For pattern-matching - def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { - Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) - } -} + case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster -private[spark] -case class GetLocations(blockId: String) extends ToBlockManagerMaster + case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster -private[spark] -case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster + case class RemoveExecutor(execId: String) extends ToBlockManagerMaster -private[spark] -case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster + case object StopBlockManagerMaster extends ToBlockManagerMaster -private[spark] -case class RemoveExecutor(execId: String) extends ToBlockManagerMaster + case object GetMemoryStatus extends ToBlockManagerMaster -private[spark] -case object StopBlockManagerMaster extends ToBlockManagerMaster + case object ExpireDeadHosts extends ToBlockManagerMaster -private[spark] -case object GetMemoryStatus extends ToBlockManagerMaster - -private[spark] -case object ExpireDeadHosts extends ToBlockManagerMaster - -private[spark] -case object GetStorageStatus extends ToBlockManagerMaster + case object GetStorageStatus extends ToBlockManagerMaster +} diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala index 45cffad810..6e5fb43732 100644 --- a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala @@ -19,7 +19,7 @@ package spark.storage import akka.actor.Actor -import spark.{Logging, SparkException, Utils} +import spark.storage.BlockManagerMessages._ /** diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala index 4faa715c94..2aecd1ea71 100644 --- a/core/src/main/scala/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/spark/storage/BlockManagerSource.scala @@ -3,7 +3,7 @@ package spark.storage import com.codahale.metrics.{Gauge,MetricRegistry} import spark.metrics.source.Source -import spark.storage._ + private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source { val metricRegistry = new MetricRegistry() diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala index ab72dbb62b..bcce26b7c1 100644 --- a/core/src/main/scala/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/spark/storage/BlockMessage.scala @@ -22,7 +22,6 @@ import java.nio.ByteBuffer import scala.collection.mutable.StringBuilder import scala.collection.mutable.ArrayBuffer -import spark._ import spark.network._ private[spark] case class GetBlock(id: String) diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala index b0229d6124..ee2fc167d5 100644 --- a/core/src/main/scala/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala @@ -19,7 +19,6 @@ package spark.storage import java.nio.ByteBuffer -import scala.collection.mutable.StringBuilder import scala.collection.mutable.ArrayBuffer import spark._ @@ -113,7 +112,7 @@ private[spark] object BlockMessageArray { def main(args: Array[String]) { val blockMessages = - (0 until 10).map(i => { + (0 until 10).map { i => if (i % 2 == 0) { val buffer = ByteBuffer.allocate(100) buffer.clear @@ -121,7 +120,7 @@ private[spark] object BlockMessageArray { } else { BlockMessage.fromGetBlock(GetBlock(i.toString)) } - }) + } val blockMessageArray = new BlockMessageArray(blockMessages) println("Block message array created") diff --git a/core/src/main/scala/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/spark/storage/BlockObjectWriter.scala index 01ed6e8c1f..3812009ca1 100644 --- a/core/src/main/scala/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/spark/storage/BlockObjectWriter.scala @@ -17,8 +17,6 @@ package spark.storage -import java.nio.ByteBuffer - /** * An interface for writing JVM objects to some underlying storage. This interface allows -- GitLab