diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 3186f7c85b16404e582906ba2ec1f89a57d6832e..76128e8cfffe9f0773f7807d950bc56554ebdc93 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -23,6 +23,7 @@ import akka.pattern.ask import akka.util.Duration import spark.{Logging, SparkException} +import spark.storage.BlockManagerMessages._ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging { diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index 244000d95276468088747863018723323b24a3c0..011bb6b83d70025ac5c9e055b90dafee34f50538 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -29,6 +29,8 @@ import akka.util.Duration import akka.util.duration._ import spark.{Logging, Utils, SparkException} +import spark.storage.BlockManagerMessages._ + /** * BlockManagerMasterActor is an actor on the master node to track statuses of diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala index 01de4ccb8f45face3494d4600cae2c9c0974def8..9375a9ca54825138d3988a6615af8d941ebff06a 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala @@ -22,102 +22,89 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import akka.actor.ActorRef -////////////////////////////////////////////////////////////////////////////////// -// Messages from the master to slaves. -////////////////////////////////////////////////////////////////////////////////// -private[spark] -sealed trait ToBlockManagerSlave - -// Remove a block from the slaves that have it. This can only be used to remove -// blocks that the master knows about. -private[spark] -case class RemoveBlock(blockId: String) extends ToBlockManagerSlave - -// Remove all blocks belonging to a specific RDD. -private[spark] case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave - - -////////////////////////////////////////////////////////////////////////////////// -// Messages from slaves to the master. -////////////////////////////////////////////////////////////////////////////////// -private[spark] -sealed trait ToBlockManagerMaster - -private[spark] -case class RegisterBlockManager( - blockManagerId: BlockManagerId, - maxMemSize: Long, - sender: ActorRef) - extends ToBlockManagerMaster - -private[spark] -case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster - -private[spark] -class UpdateBlockInfo( - var blockManagerId: BlockManagerId, - var blockId: String, - var storageLevel: StorageLevel, - var memSize: Long, - var diskSize: Long) - extends ToBlockManagerMaster - with Externalizable { - - def this() = this(null, null, null, 0, 0) // For deserialization only - - override def writeExternal(out: ObjectOutput) { - blockManagerId.writeExternal(out) - out.writeUTF(blockId) - storageLevel.writeExternal(out) - out.writeLong(memSize) - out.writeLong(diskSize) +private[storage] object BlockManagerMessages { + ////////////////////////////////////////////////////////////////////////////////// + // Messages from the master to slaves. + ////////////////////////////////////////////////////////////////////////////////// + sealed trait ToBlockManagerSlave + + // Remove a block from the slaves that have it. This can only be used to remove + // blocks that the master knows about. + case class RemoveBlock(blockId: String) extends ToBlockManagerSlave + + // Remove all blocks belonging to a specific RDD. + case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave + + + ////////////////////////////////////////////////////////////////////////////////// + // Messages from slaves to the master. + ////////////////////////////////////////////////////////////////////////////////// + sealed trait ToBlockManagerMaster + + case class RegisterBlockManager( + blockManagerId: BlockManagerId, + maxMemSize: Long, + sender: ActorRef) + extends ToBlockManagerMaster + + case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster + + class UpdateBlockInfo( + var blockManagerId: BlockManagerId, + var blockId: String, + var storageLevel: StorageLevel, + var memSize: Long, + var diskSize: Long) + extends ToBlockManagerMaster + with Externalizable { + + def this() = this(null, null, null, 0, 0) // For deserialization only + + override def writeExternal(out: ObjectOutput) { + blockManagerId.writeExternal(out) + out.writeUTF(blockId) + storageLevel.writeExternal(out) + out.writeLong(memSize) + out.writeLong(diskSize) + } + + override def readExternal(in: ObjectInput) { + blockManagerId = BlockManagerId(in) + blockId = in.readUTF() + storageLevel = StorageLevel(in) + memSize = in.readLong() + diskSize = in.readLong() + } } - override def readExternal(in: ObjectInput) { - blockManagerId = BlockManagerId(in) - blockId = in.readUTF() - storageLevel = StorageLevel(in) - memSize = in.readLong() - diskSize = in.readLong() + object UpdateBlockInfo { + def apply(blockManagerId: BlockManagerId, + blockId: String, + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long): UpdateBlockInfo = { + new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize) + } + + // For pattern-matching + def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { + Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) + } } -} -private[spark] -object UpdateBlockInfo { - def apply(blockManagerId: BlockManagerId, - blockId: String, - storageLevel: StorageLevel, - memSize: Long, - diskSize: Long): UpdateBlockInfo = { - new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize) - } + case class GetLocations(blockId: String) extends ToBlockManagerMaster - // For pattern-matching - def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { - Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) - } -} + case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster -private[spark] -case class GetLocations(blockId: String) extends ToBlockManagerMaster + case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster -private[spark] -case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster + case class RemoveExecutor(execId: String) extends ToBlockManagerMaster -private[spark] -case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster + case object StopBlockManagerMaster extends ToBlockManagerMaster -private[spark] -case class RemoveExecutor(execId: String) extends ToBlockManagerMaster + case object GetMemoryStatus extends ToBlockManagerMaster -private[spark] -case object StopBlockManagerMaster extends ToBlockManagerMaster + case object ExpireDeadHosts extends ToBlockManagerMaster -private[spark] -case object GetMemoryStatus extends ToBlockManagerMaster - -private[spark] -case object ExpireDeadHosts extends ToBlockManagerMaster - -private[spark] -case object GetStorageStatus extends ToBlockManagerMaster + case object GetStorageStatus extends ToBlockManagerMaster +} diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala index 45cffad810ef5a74222f06ac13108007f64aa604..6e5fb43732905e66deedb0ea6e2bb17272e908a2 100644 --- a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala @@ -19,7 +19,7 @@ package spark.storage import akka.actor.Actor -import spark.{Logging, SparkException, Utils} +import spark.storage.BlockManagerMessages._ /** diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala index 4faa715c9475fce4035e02804ba4a075a5cf13aa..2aecd1ea7167662c9ef858abb646a72ed856a9c3 100644 --- a/core/src/main/scala/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/spark/storage/BlockManagerSource.scala @@ -3,7 +3,7 @@ package spark.storage import com.codahale.metrics.{Gauge,MetricRegistry} import spark.metrics.source.Source -import spark.storage._ + private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source { val metricRegistry = new MetricRegistry() diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala index ab72dbb62bad14aa44149ba0e88340f2da0d944e..bcce26b7c14e94316b6818b9eef47156f0503cce 100644 --- a/core/src/main/scala/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/spark/storage/BlockMessage.scala @@ -22,7 +22,6 @@ import java.nio.ByteBuffer import scala.collection.mutable.StringBuilder import scala.collection.mutable.ArrayBuffer -import spark._ import spark.network._ private[spark] case class GetBlock(id: String) diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala index b0229d6124d63b40462cb0c8bb44644865e8422b..ee2fc167d5b5198bfafebe634f6f3d1b8c2d8fc4 100644 --- a/core/src/main/scala/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala @@ -19,7 +19,6 @@ package spark.storage import java.nio.ByteBuffer -import scala.collection.mutable.StringBuilder import scala.collection.mutable.ArrayBuffer import spark._ @@ -113,7 +112,7 @@ private[spark] object BlockMessageArray { def main(args: Array[String]) { val blockMessages = - (0 until 10).map(i => { + (0 until 10).map { i => if (i % 2 == 0) { val buffer = ByteBuffer.allocate(100) buffer.clear @@ -121,7 +120,7 @@ private[spark] object BlockMessageArray { } else { BlockMessage.fromGetBlock(GetBlock(i.toString)) } - }) + } val blockMessageArray = new BlockMessageArray(blockMessages) println("Block message array created") diff --git a/core/src/main/scala/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/spark/storage/BlockObjectWriter.scala index 01ed6e8c1fa321f4f02aeee9e81e6a570412aaf5..3812009ca16709bda71e60366180cda77a2b2536 100644 --- a/core/src/main/scala/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/spark/storage/BlockObjectWriter.scala @@ -17,8 +17,6 @@ package spark.storage -import java.nio.ByteBuffer - /** * An interface for writing JVM objects to some underlying storage. This interface allows