diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 25593c596b892e48c146bf04362ed18878f695aa..694db6b2a30667b5758d32d5ffd68835096cdc67 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -31,7 +31,7 @@ class SparkEnv ( shuffleFetcher.stop() shuffleManager.stop() blockManager.stop() - BlockManagerMaster.stopBlockManagerMaster() + blockManager.master.stop() actorSystem.shutdown() actorSystem.awaitTermination() } @@ -66,9 +66,9 @@ object SparkEnv { val serializerClass = System.getProperty("spark.serializer", "spark.KryoSerializer") val serializer = Class.forName(serializerClass).newInstance().asInstanceOf[Serializer] - BlockManagerMaster.startBlockManagerMaster(actorSystem, isMaster, isLocal) + val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal) - val blockManager = new BlockManager(serializer) + val blockManager = new BlockManager(blockManagerMaster, serializer) val connectionManager = blockManager.connectionManager diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 436c16cdddebd9cad11fc1aa97ffbfe94cf8fd2c..f7472971b55b6e72e9e66fa63594e6ea153b3811 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -498,7 +498,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with if (!deadHosts.contains(host)) { logInfo("Host lost: " + host) deadHosts += host - BlockManagerMaster.notifyADeadHost(host) + env.blockManager.master.notifyADeadHost(host) // TODO: This will be really slow if we keep accumulating shuffle map stages for ((shuffleId, stage) <- shuffleToMapStage) { stage.removeOutputsOnHost(host) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 15131960d608918c844b9cc957380aec97096ec3..1ab8485ce4a841a296e1e6ee9ec69c5115fc080b 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -66,8 +66,8 @@ class BlockLocker(numLockers: Int) { } - -class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging { +class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) + extends Logging { case class BlockInfo(level: StorageLevel, tellMaster: Boolean) @@ -94,15 +94,16 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging /** * Construct a BlockManager with a memory limit set based on system properties. */ - def this(serializer: Serializer) = - this(BlockManager.getMaxMemoryFromSystemProperties(), serializer) + def this(master: BlockManagerMaster, serializer: Serializer) = { + this(master, serializer, BlockManager.getMaxMemoryFromSystemProperties) + } /** * Initialize the BlockManager. Register to the BlockManagerMaster, and start the * BlockManagerWorker actor. */ private def initialize() { - BlockManagerMaster.mustRegisterBlockManager( + master.mustRegisterBlockManager( RegisterBlockManager(blockManagerId, maxMemory, maxMemory)) BlockManagerWorker.startBlockManagerWorker(this) } @@ -154,7 +155,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging */ def getLocations(blockId: String): Seq[String] = { val startTimeMs = System.currentTimeMillis - var managers = BlockManagerMaster.mustGetLocations(GetLocations(blockId)) + var managers = master.mustGetLocations(GetLocations(blockId)) val locations = managers.map(_.ip) logDebug("Get block locations in " + Utils.getUsedTimeMs(startTimeMs)) return locations @@ -165,7 +166,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging */ def getLocations(blockIds: Array[String]): Array[Seq[String]] = { val startTimeMs = System.currentTimeMillis - val locations = BlockManagerMaster.mustGetLocationsMultipleBlockIds( + val locations = master.mustGetLocationsMultipleBlockIds( GetLocationsMultipleBlockIds(blockIds)).map(_.map(_.ip).toSeq).toArray logDebug("Get multiple block location in " + Utils.getUsedTimeMs(startTimeMs)) return locations @@ -235,7 +236,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging } logDebug("Getting remote block " + blockId) // Get locations of block - val locations = BlockManagerMaster.mustGetLocations(GetLocations(blockId)) + val locations = master.mustGetLocations(GetLocations(blockId)) // Get block from remote locations for (loc <- locations) { @@ -321,8 +322,8 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) { throw new BlockException(oneBlockId, "Unexpected message received from " + cmId) } - val buffer = blockMessage.getData() - val blockId = blockMessage.getId() + val buffer = blockMessage.getData + val blockId = blockMessage.getId val block = dataDeserialize(buffer) blocks.update(blockId, Some(block)) logDebug("Got remote block " + blockId + " in " + Utils.getUsedTimeMs(startTime)) @@ -490,7 +491,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging private def replicate(blockId: String, data: ByteBuffer, level: StorageLevel) { val tLevel: StorageLevel = new StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) - var peers = BlockManagerMaster.mustGetPeers(GetPeers(blockManagerId, level.replication - 1)) + var peers = master.mustGetPeers(GetPeers(blockManagerId, level.replication - 1)) for (peer: BlockManagerId <- peers) { val start = System.nanoTime logDebug("Try to replicate BlockId " + blockId + " once; The size of the data is " @@ -564,7 +565,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging } private def notifyMaster(heartBeat: HeartBeat) { - BlockManagerMaster.mustHeartBeat(heartBeat) + master.mustHeartBeat(heartBeat) } def stop() { @@ -576,12 +577,9 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging } } - -object BlockManager extends Logging { +object BlockManager { def getMaxMemoryFromSystemProperties(): Long = { val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble - val bytes = (Runtime.getRuntime.totalMemory * memoryFraction).toLong - logInfo("Maximum memory to use: " + bytes) - bytes + (Runtime.getRuntime.totalMemory * memoryFraction).toLong } } diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 97a5b0cb45eca0bbfb96cf2310c4e7e5a4f30221..9f03c5a32c2929da2fff2ec33202824a72ea5683 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -85,7 +85,7 @@ case class RemoveHost(host: String) extends ToBlockManagerMaster case object StopBlockManagerMaster extends ToBlockManagerMaster -class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging { +class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { class BlockManagerInfo( timeMs: Long, @@ -134,19 +134,19 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging { } } - def getLastSeenMs(): Long = { + def getLastSeenMs: Long = { return lastSeenMs } - def getRemainedMem(): Long = { + def getRemainedMem: Long = { return remainedMem } - def getRemainedDisk(): Long = { + def getRemainedDisk: Long = { return remainedDisk } - override def toString(): String = { + override def toString: String = { return "BlockManagerInfo " + timeMs + " " + remainedMem + " " + remainedDisk } @@ -329,8 +329,8 @@ class BlockManagerMaster(val isLocal: Boolean) extends Actor with Logging { } } -object BlockManagerMaster extends Logging { - initLogging() +class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean) + extends Logging { val AKKA_ACTOR_NAME: String = "BlockMasterManager" val REQUEST_RETRY_INTERVAL_MS = 100 @@ -342,20 +342,18 @@ object BlockManagerMaster extends Logging { val timeout = 10.seconds var masterActor: ActorRef = null - def startBlockManagerMaster(actorSystem: ActorSystem, isMaster: Boolean, isLocal: Boolean) { - if (isMaster) { - masterActor = actorSystem.actorOf( - Props(new BlockManagerMaster(isLocal)), name = AKKA_ACTOR_NAME) - logInfo("Registered BlockManagerMaster Actor") - } else { - val url = "akka://spark@%s:%s/user/%s".format( - DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME) - logInfo("Connecting to BlockManagerMaster: " + url) - masterActor = actorSystem.actorFor(url) - } + if (isMaster) { + masterActor = actorSystem.actorOf( + Props(new BlockManagerMasterActor(isLocal)), name = AKKA_ACTOR_NAME) + logInfo("Registered BlockManagerMaster Actor") + } else { + val url = "akka://spark@%s:%s/user/%s".format( + DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME) + logInfo("Connecting to BlockManagerMaster: " + url) + masterActor = actorSystem.actorFor(url) } - def stopBlockManagerMaster() { + def stop() { if (masterActor != null) { communicate(StopBlockManagerMaster) masterActor = null diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala index 501183ab1fc92e2f8bdd86c703dadd07b8a04990..c61e2802527deb6e9a4d5b93c3a93e8964c0b137 100644 --- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala @@ -48,15 +48,15 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging { } def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = { - blockMessage.getType() match { + blockMessage.getType match { case BlockMessage.TYPE_PUT_BLOCK => { - val pB = PutBlock(blockMessage.getId(), blockMessage.getData(), blockMessage.getLevel()) + val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel) logInfo("Received [" + pB + "]") putBlock(pB.id, pB.data, pB.level) return None } case BlockMessage.TYPE_GET_BLOCK => { - val gB = new GetBlock(blockMessage.getId()) + val gB = new GetBlock(blockMessage.getId) logInfo("Received [" + gB + "]") val buffer = getBlock(gB.id) if (buffer == null) { diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala index bb128dce7a6b8ad45c476c59d87ccf17c77ab667..0b2ed69e0786d7bda94b73470b490336da9871ba 100644 --- a/core/src/main/scala/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/spark/storage/BlockMessage.scala @@ -102,23 +102,23 @@ class BlockMessage() extends Logging{ set(buffer) } - def getType(): Int = { + def getType: Int = { return typ } - def getId(): String = { + def getId: String = { return id } - def getData(): ByteBuffer = { + def getData: ByteBuffer = { return data } - def getLevel(): StorageLevel = { + def getLevel: StorageLevel = { return level } - def toBufferMessage(): BufferMessage = { + def toBufferMessage: BufferMessage = { val startTime = System.currentTimeMillis val buffers = new ArrayBuffer[ByteBuffer]() var buffer = ByteBuffer.allocate(4 + 4 + id.length() * 2) @@ -128,7 +128,7 @@ class BlockMessage() extends Logging{ buffers += buffer if (typ == BlockMessage.TYPE_PUT_BLOCK) { - buffer = ByteBuffer.allocate(8).putInt(level.toInt()).putInt(level.replication) + buffer = ByteBuffer.allocate(8).putInt(level.toInt).putInt(level.replication) buffer.flip() buffers += buffer @@ -164,7 +164,7 @@ class BlockMessage() extends Logging{ return Message.createBufferMessage(buffers) } - override def toString(): String = { + override def toString: String = { "BlockMessage [type = " + typ + ", id = " + id + ", level = " + level + ", data = " + (if (data != null) data.remaining.toString else "null") + "]" } @@ -209,11 +209,11 @@ object BlockMessage { def main(args: Array[String]) { val B = new BlockMessage() B.set(new PutBlock("ABC", ByteBuffer.allocate(10), StorageLevel.DISK_AND_MEMORY_2)) - val bMsg = B.toBufferMessage() + val bMsg = B.toBufferMessage val C = new BlockMessage() C.set(bMsg) - println(B.getId() + " " + B.getLevel()) - println(C.getId() + " " + C.getLevel()) + println(B.getId + " " + B.getLevel) + println(C.getId + " " + C.getLevel) } } diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala index 5f411d34884e12871405b12b24dcb0765af01427..a108ab653e73cdc395b5f2dabbc4374259cc6d44 100644 --- a/core/src/main/scala/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala @@ -123,13 +123,13 @@ object BlockMessageArray { val newBlockMessageArray = BlockMessageArray.fromBufferMessage(newBufferMessage) println("Converted back to block message array") newBlockMessageArray.foreach(blockMessage => { - blockMessage.getType() match { + blockMessage.getType match { case BlockMessage.TYPE_PUT_BLOCK => { - val pB = PutBlock(blockMessage.getId(), blockMessage.getData(), blockMessage.getLevel()) + val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel) println(pB) } case BlockMessage.TYPE_GET_BLOCK => { - val gB = new GetBlock(blockMessage.getId()) + val gB = new GetBlock(blockMessage.getId) println(gB) } } diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala index 693a679c4e79fc7adb788fc705e93f79af76b3f2..f067a2a6c5838de2e767b7d0bb5b30111c1eeb7a 100644 --- a/core/src/main/scala/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/spark/storage/StorageLevel.scala @@ -11,11 +11,8 @@ class StorageLevel( // TODO: Also add fields for caching priority, dataset ID, and flushing. - def this(booleanInt: Int, replication: Int) { - this(((booleanInt & 4) != 0), - ((booleanInt & 2) != 0), - ((booleanInt & 1) != 0), - replication) + def this(flags: Int, replication: Int) { + this((flags & 4) != 0, (flags & 2) != 0, (flags & 1) != 0, replication) } def this() = this(false, true, false) // For deserialization @@ -33,25 +30,25 @@ class StorageLevel( false } - def isValid() = ((useMemory || useDisk) && (replication > 0)) + def isValid = ((useMemory || useDisk) && (replication > 0)) - def toInt(): Int = { + def toInt: Int = { var ret = 0 if (useDisk) { - ret += 4 + ret |= 4 } if (useMemory) { - ret += 2 + ret |= 2 } if (deserialized) { - ret += 1 + ret |= 1 } return ret } override def writeExternal(out: ObjectOutput) { - out.writeByte(toInt().toByte) - out.writeByte(replication.toByte) + out.writeByte(toInt) + out.writeByte(replication) } override def readExternal(in: ObjectInput) { @@ -62,7 +59,7 @@ class StorageLevel( replication = in.readByte() } - override def toString(): String = + override def toString: String = "StorageLevel(%b, %b, %b, %d)".format(useDisk, useMemory, deserialized, replication) } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 1ed5519d37a29af89a43d48983adc2958f9fccd5..61decd81e6e1ffa2e70f6819d3beb69fff6ccdce 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -5,27 +5,29 @@ import java.nio.ByteBuffer import akka.actor._ import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfterEach +import org.scalatest.BeforeAndAfter import spark.KryoSerializer import spark.util.ByteBufferInputStream -class BlockManagerSuite extends FunSuite with BeforeAndAfterEach { +class BlockManagerSuite extends FunSuite with BeforeAndAfter { var actorSystem: ActorSystem = null + var master: BlockManagerMaster = null - override def beforeEach() { + before { actorSystem = ActorSystem("test") - BlockManagerMaster.startBlockManagerMaster(actorSystem, true, true) + master = new BlockManagerMaster(actorSystem, true, true) } - override def afterEach() { + after { actorSystem.shutdown() actorSystem.awaitTermination() actorSystem = null + master = null } test("manager-master interaction") { - val store = new BlockManager(2000, new KryoSerializer) + val store = new BlockManager(master, new KryoSerializer, 2000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -41,21 +43,21 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach { assert(store.getSingle("a3") != None, "a3 was not in store") // Checking whether master knows about the blocks or not - assert(BlockManagerMaster.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") - assert(BlockManagerMaster.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2") - assert(BlockManagerMaster.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") + assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1") + assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2") + assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") // Setting storage level of a1 and a2 to invalid; they should be removed from store and master store.setLevel("a1", new StorageLevel(false, false, false, 1)) store.setLevel("a2", new StorageLevel(true, false, false, 0)) assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store") - assert(BlockManagerMaster.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1") - assert(BlockManagerMaster.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2") + assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1") + assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2") } test("in-memory LRU storage") { - val store = new BlockManager(1000, new KryoSerializer) + val store = new BlockManager(master, new KryoSerializer, 1000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -76,7 +78,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach { } test("in-memory LRU storage with serialization") { - val store = new BlockManager(1000, new KryoSerializer) + val store = new BlockManager(master, new KryoSerializer, 1000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -97,7 +99,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach { } test("on-disk storage") { - val store = new BlockManager(1000, new KryoSerializer) + val store = new BlockManager(master, new KryoSerializer, 1000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -110,7 +112,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach { } test("disk and memory storage") { - val store = new BlockManager(1000, new KryoSerializer) + val store = new BlockManager(master, new KryoSerializer, 1000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -124,7 +126,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach { } test("disk and memory storage with serialization") { - val store = new BlockManager(1000, new KryoSerializer) + val store = new BlockManager(master, new KryoSerializer, 1000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -138,7 +140,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach { } test("LRU with mixed storage levels") { - val store = new BlockManager(1000, new KryoSerializer) + val store = new BlockManager(master, new KryoSerializer, 1000) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -164,7 +166,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach { } test("in-memory LRU with streams") { - val store = new BlockManager(1000, new KryoSerializer) + val store = new BlockManager(master, new KryoSerializer, 1000) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -190,7 +192,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfterEach { } test("LRU with mixed storage levels and streams") { - val store = new BlockManager(1000, new KryoSerializer) + val store = new BlockManager(master, new KryoSerializer, 1000) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200))