Skip to content
Snippets Groups Projects
Commit a1a2daa7 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #317 from woggling/block-manager-heartbeat

Implement block manager heartbeat
parents a9ea14d6 b6b62d77
No related branches found
No related tags found
No related merge requests found
......@@ -88,7 +88,7 @@ object SparkEnv extends Logging {
val serializer = instantiateClass[Serializer]("spark.serializer", "spark.JavaSerializer")
val blockManagerMaster = new BlockManagerMaster(actorSystem, isMaster, isLocal)
val blockManager = new BlockManager(blockManagerMaster, serializer)
val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
val connectionManager = blockManager.connectionManager
......
......@@ -35,11 +35,15 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int)
/* Start the Slaves */
for (slaveNum <- 1 to numSlaves) {
/* We can pretend to test distributed stuff by giving the slaves distinct hostnames.
All of 127/8 should be a loopback, we use 127.100.*.* in hopes that it is
sufficiently distinctive. */
val slaveIpAddress = "127.100.0." + (slaveNum % 256)
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
AkkaUtils.createActorSystem("sparkWorker" + slaveNum, slaveIpAddress, 0)
slaveActorSystems += actorSystem
val actor = actorSystem.actorOf(
Props(new Worker(localIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
Props(new Worker(slaveIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
name = "Worker")
slaveActors += actor
}
......
package spark.storage
import akka.actor.{ActorSystem, Cancellable}
import akka.dispatch.{Await, Future}
import akka.util.Duration
import akka.util.duration._
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
......@@ -12,7 +14,7 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConversions._
import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils}
import spark.{CacheTracker, Logging, SizeEstimator, SparkEnv, SparkException, Utils}
import spark.network._
import spark.serializer.Serializer
import spark.util.ByteBufferInputStream
......@@ -45,13 +47,13 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
}
}
private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
private[spark]
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
class BlockManager(actorSystem: ActorSystem, val master: BlockManagerMaster,
val serializer: Serializer, maxMemory: Long)
extends Logging {
class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
......@@ -104,15 +106,27 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Whether to compress RDD partitions that are stored serialized
val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties
val host = System.getProperty("spark.hostname", Utils.localHostName())
@volatile private var shuttingDown = false
private def heartBeat() {
if (!master.mustHeartBeat(HeartBeat(blockManagerId))) {
reregister()
}
}
var heartBeatTask: Cancellable = null
initialize()
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
def this(master: BlockManagerMaster, serializer: Serializer) = {
this(master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
def this(actorSystem: ActorSystem, master: BlockManagerMaster, serializer: Serializer) = {
this(actorSystem, master, serializer, BlockManager.getMaxMemoryFromSystemProperties)
}
/**
......@@ -123,6 +137,43 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
master.mustRegisterBlockManager(
RegisterBlockManager(blockManagerId, maxMemory))
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
heartBeat()
}
}
}
/**
* Report all blocks to the BlockManager again. This may be necessary if we are dropped
* by the BlockManager and come back or if we become capable of recovering blocks on disk after
* an executor crash.
*
* This function deliberately fails silently if the master returns false (indicating that
* the slave needs to reregister). The error condition will be detected again by the next
* heart beat attempt or new block registration and another try to reregister all blocks
* will be made then.
*/
private def reportAllBlocks() {
logInfo("Reporting " + blockInfo.size + " blocks to the master.")
for (blockId <- blockInfo.keys) {
if (!tryToReportBlockStatus(blockId)) {
logError("Failed to report " + blockId + " to master; giving up.")
return
}
}
}
/**
* Reregister with the master and report all blocks to it. This will be called by the heart beat
* thread if our heartbeat to the block amnager indicates that we were not registered.
*/
def reregister() {
// TODO: We might need to rate limit reregistering.
logInfo("BlockManager reregistering with master")
master.mustRegisterBlockManager(
RegisterBlockManager(blockManagerId, maxMemory))
reportAllBlocks()
}
/**
......@@ -134,12 +185,25 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
* Tell the master about the current storage status of a block. This will send a heartbeat
* Tell the master about the current storage status of a block. This will send a block update
* message reflecting the current status, *not* the desired storage level in its block info.
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
def reportBlockStatus(blockId: String) {
val needReregister = !tryToReportBlockStatus(blockId)
if (needReregister) {
logInfo("Got told to reregister updating block " + blockId)
// Reregistering will report our new block for free.
reregister()
}
logDebug("Told master about block " + blockId)
}
/**
* Actually send a BlockUpdate message. Returns the mater's repsonse, which will be true if theo
* block was successfully recorded and false if the slave needs to reregister.
*/
private def tryToReportBlockStatus(blockId: String): Boolean = {
val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
case null =>
(StorageLevel.NONE, 0L, 0L)
......@@ -159,10 +223,11 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
}
master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
logDebug("Told master about block " + blockId)
return master.mustBlockUpdate(
BlockUpdate(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
}
/**
* Get locations of the block.
*/
......@@ -840,6 +905,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
def stop() {
if (heartBeatTask != null) {
heartBeatTask.cancel()
}
connectionManager.stop()
blockInfo.clear()
memoryStore.clear()
......@@ -855,6 +923,12 @@ object BlockManager extends Logging {
(Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
def getHeartBeatFrequencyFromSystemProperties: Long =
System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong
def getDisableHeartBeatsForTesting: Boolean =
System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
* might cause errors if one attempts to read from the unmapped buffer, but it's better than
......
......@@ -26,7 +26,10 @@ case class RegisterBlockManager(
extends ToBlockManagerMaster
private[spark]
class HeartBeat(
case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
private[spark]
class BlockUpdate(
var blockManagerId: BlockManagerId,
var blockId: String,
var storageLevel: StorageLevel,
......@@ -57,17 +60,17 @@ class HeartBeat(
}
private[spark]
object HeartBeat {
object BlockUpdate {
def apply(blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long): HeartBeat = {
new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize)
diskSize: Long): BlockUpdate = {
new BlockUpdate(blockManagerId, blockId, storageLevel, memSize, diskSize)
}
// For pattern-matching
def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
def unapply(h: BlockUpdate): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
}
}
......@@ -90,6 +93,9 @@ case object StopBlockManagerMaster extends ToBlockManagerMaster
private[spark]
case object GetMemoryStatus extends ToBlockManagerMaster
private[spark]
case object ExpireDeadHosts extends ToBlockManagerMaster
private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
......@@ -105,7 +111,7 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
blockManagerId.ip, blockManagerId.port, Utils.memoryBytesToString(maxMem)))
def updateLastSeenMs() {
_lastSeenMs = System.currentTimeMillis() / 1000
_lastSeenMs = System.currentTimeMillis()
}
def updateBlockInfo(blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long)
......@@ -156,6 +162,8 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
def lastSeenMs: Long = _lastSeenMs
def blocks: JHashMap[String, StorageLevel] = _blocks
override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
def clear() {
......@@ -164,26 +172,84 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
}
private val blockManagerInfo = new HashMap[BlockManagerId, BlockManagerInfo]
private val blockManagerIdByHost = new HashMap[String, BlockManagerId]
private val blockInfo = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
initLogging()
val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
"" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
"5000").toLong
var timeoutCheckingTask: Cancellable = null
override def preStart() {
if (!BlockManager.getDisableHeartBeatsForTesting) {
timeoutCheckingTask = context.system.scheduler.schedule(
0.seconds, checkTimeoutInterval.milliseconds, self, ExpireDeadHosts)
}
super.preStart()
}
def removeBlockManager(blockManagerId: BlockManagerId) {
val info = blockManagerInfo(blockManagerId)
blockManagerIdByHost.remove(blockManagerId.ip)
blockManagerInfo.remove(blockManagerId)
var iterator = info.blocks.keySet.iterator
while (iterator.hasNext) {
val blockId = iterator.next
val locations = blockInfo.get(blockId)._2
locations -= blockManagerId
if (locations.size == 0) {
blockInfo.remove(locations)
}
}
}
def expireDeadHosts() {
logDebug("Checking for hosts with no recent heart beats in BlockManagerMaster.")
val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
val toRemove = new HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime) {
logInfo("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
toRemove += info.blockManagerId
}
}
// TODO: Remove corresponding block infos
toRemove.foreach(removeBlockManager)
}
def removeHost(host: String) {
logInfo("Trying to remove the host: " + host + " from BlockManagerMaster.")
logInfo("Previous hosts: " + blockManagerInfo.keySet.toSeq)
val ip = host.split(":")(0)
val port = host.split(":")(1)
blockManagerInfo.remove(new BlockManagerId(ip, port.toInt))
blockManagerIdByHost.get(host).foreach(removeBlockManager)
logInfo("Current hosts: " + blockManagerInfo.keySet.toSeq)
sender ! true
}
def heartBeat(blockManagerId: BlockManagerId) {
if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
sender ! true
} else {
sender ! false
}
} else {
blockManagerInfo(blockManagerId).updateLastSeenMs()
sender ! true
}
}
def receive = {
case RegisterBlockManager(blockManagerId, maxMemSize) =>
register(blockManagerId, maxMemSize)
case HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
heartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size)
case BlockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
blockUpdate(blockManagerId, blockId, storageLevel, deserializedSize, size)
case GetLocations(blockId) =>
getLocations(blockId)
......@@ -205,8 +271,17 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
case StopBlockManagerMaster =>
logInfo("Stopping BlockManagerMaster")
sender ! true
if (timeoutCheckingTask != null) {
timeoutCheckingTask.cancel
}
context.stop(self)
case ExpireDeadHosts =>
expireDeadHosts()
case HeartBeat(blockManagerId) =>
heartBeat(blockManagerId)
case other =>
logInfo("Got unknown message: " + other)
}
......@@ -223,17 +298,25 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
if (blockManagerIdByHost.contains(blockManagerId.ip) &&
blockManagerIdByHost(blockManagerId.ip) != blockManagerId) {
val oldId = blockManagerIdByHost(blockManagerId.ip)
logInfo("Got second registration for host " + blockManagerId +
"; removing old slave " + oldId)
removeBlockManager(oldId)
}
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
logInfo("Got Register Msg from master node, don't register it")
} else {
blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
blockManagerId, System.currentTimeMillis() / 1000, maxMemSize))
blockManagerId, System.currentTimeMillis(), maxMemSize))
}
blockManagerIdByHost += (blockManagerId.ip -> blockManagerId)
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
}
private def heartBeat(
private def blockUpdate(
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
......@@ -244,15 +327,21 @@ private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor
val tmp = " " + blockManagerId + " " + blockId + " "
if (!blockManagerInfo.contains(blockManagerId)) {
// Can happen if this is from a locally cached partition on the master
sender ! true
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
sender ! true
} else {
sender ! false
}
return
}
if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
logDebug("Got in heartBeat 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
logDebug("Got in block update 1" + tmp + " used " + Utils.getUsedTimeMs(startTimeMs))
sender ! true
return
}
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
......@@ -361,7 +450,6 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
val DEFAULT_MASTER_IP: String = System.getProperty("spark.master.host", "localhost")
val DEFAULT_MASTER_PORT: Int = System.getProperty("spark.master.port", "7077").toInt
val DEFAULT_MANAGER_IP: String = Utils.localHostName()
val DEFAULT_MANAGER_PORT: String = "10902"
val timeout = 10.seconds
var masterActor: ActorRef = null
......@@ -405,7 +493,7 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
def notifyADeadHost(host: String) {
communicate(RemoveHost(host + ":" + DEFAULT_MANAGER_PORT))
communicate(RemoveHost(host))
logInfo("Removed " + host + " successfully in notifyADeadHost")
}
......@@ -436,27 +524,49 @@ private[spark] class BlockManagerMaster(actorSystem: ActorSystem, isMaster: Bool
}
}
def mustHeartBeat(msg: HeartBeat) {
while (! syncHeartBeat(msg)) {
logWarning("Failed to send heartbeat" + msg)
def mustHeartBeat(msg: HeartBeat): Boolean = {
var res = syncHeartBeat(msg)
while (!res.isDefined) {
logWarning("Failed to send heart beat " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
}
return res.get
}
def syncHeartBeat(msg: HeartBeat): Boolean = {
def syncHeartBeat(msg: HeartBeat): Option[Boolean] = {
try {
val answer = askMaster(msg).asInstanceOf[Boolean]
return Some(answer)
} catch {
case e: Exception =>
logError("Failed in syncHeartBeat", e)
return None
}
}
def mustBlockUpdate(msg: BlockUpdate): Boolean = {
var res = syncBlockUpdate(msg)
while (!res.isDefined) {
logWarning("Failed to send block update " + msg)
Thread.sleep(REQUEST_RETRY_INTERVAL_MS)
}
return res.get
}
def syncBlockUpdate(msg: BlockUpdate): Option[Boolean] = {
val startTimeMs = System.currentTimeMillis()
val tmp = " msg " + msg + " "
logDebug("Got in syncHeartBeat " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
logDebug("Got in syncBlockUpdate " + tmp + " 0 " + Utils.getUsedTimeMs(startTimeMs))
try {
communicate(msg)
logDebug("Heartbeat sent successfully")
logDebug("Got in syncHeartBeat 1 " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
return true
val answer = askMaster(msg).asInstanceOf[Boolean]
logDebug("Block update sent successfully")
logDebug("Got in synbBlockUpdate " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
return Some(answer)
} catch {
case e: Exception =>
logError("Failed in syncHeartBeat", e)
return false
logError("Failed in syncBlockUpdate", e)
return None
}
}
......
......@@ -74,7 +74,7 @@ private[spark] object ThreadingTest {
val actorSystem = ActorSystem("test")
val serializer = new KryoSerializer
val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024)
val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
producers.foreach(_.start)
......
......@@ -14,10 +14,12 @@ import spark.util.ByteBufferInputStream
class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
var store: BlockManager = null
var store2: BlockManager = null
var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null
var oldArch: String = null
var oldOops: String = null
var oldHeartBeat: String = null
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
val serializer = new KryoSerializer
......@@ -29,6 +31,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
}
......@@ -36,6 +39,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
after {
if (store != null) {
store.stop()
store = null
}
if (store2 != null) {
store2.stop()
store2 = null
}
actorSystem.shutdown()
actorSystem.awaitTermination()
......@@ -56,7 +64,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("manager-master interaction") {
store = new BlockManager(master, serializer, 2000)
store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
......@@ -85,8 +93,68 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.mustGetLocations(GetLocations("a2")).size === 0, "master did not remove a2")
}
test("reregistration on heart beat") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") != None, "a1 was not in store")
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
master.notifyADeadHost(store.blockManagerId.ip)
assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
store invokePrivate heartBeat()
assert(master.mustGetLocations(GetLocations("a1")).size > 0,
"a1 was not reregistered with master")
}
test("reregistration on block update") {
store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
master.notifyADeadHost(store.blockManagerId.ip)
assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
assert(master.mustGetLocations(GetLocations("a1")).size > 0,
"a1 was not reregistered with master")
assert(master.mustGetLocations(GetLocations("a2")).size > 0,
"master was not told about a2")
}
test("deregistration on duplicate") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
store = new BlockManager(actorSystem, master, serializer, 2000)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
store2 = new BlockManager(actorSystem, master, serializer, 2000)
assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a1 was not removed from master")
store invokePrivate heartBeat()
assert(master.mustGetLocations(GetLocations("a1")).size > 0, "master was not told about a1")
store2 invokePrivate heartBeat()
assert(master.mustGetLocations(GetLocations("a1")).size == 0, "a2 was not removed from master")
}
test("in-memory LRU storage") {
store = new BlockManager(master, serializer, 1200)
store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
......@@ -105,7 +173,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU storage with serialization") {
store = new BlockManager(master, serializer, 1200)
store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
......@@ -124,7 +192,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of same RDD") {
store = new BlockManager(master, serializer, 1200)
store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
......@@ -143,7 +211,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU for partitions of multiple RDDs") {
store = new BlockManager(master, serializer, 1200)
store = new BlockManager(actorSystem, master, serializer, 1200)
store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
......@@ -166,7 +234,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("on-disk storage") {
store = new BlockManager(master, serializer, 1200)
store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
......@@ -179,7 +247,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage") {
store = new BlockManager(master, serializer, 1200)
store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
......@@ -194,7 +262,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with getLocalBytes") {
store = new BlockManager(master, serializer, 1200)
store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
......@@ -209,7 +277,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization") {
store = new BlockManager(master, serializer, 1200)
store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
......@@ -224,7 +292,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("disk and memory storage with serialization and getLocalBytes") {
store = new BlockManager(master, serializer, 1200)
store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
......@@ -239,7 +307,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels") {
store = new BlockManager(master, serializer, 1200)
store = new BlockManager(actorSystem, master, serializer, 1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
......@@ -264,7 +332,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("in-memory LRU with streams") {
store = new BlockManager(master, serializer, 1200)
store = new BlockManager(actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
......@@ -288,7 +356,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("LRU with mixed storage levels and streams") {
store = new BlockManager(master, serializer, 1200)
store = new BlockManager(actorSystem, master, serializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200))
......@@ -334,7 +402,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("overly large block") {
store = new BlockManager(master, serializer, 500)
store = new BlockManager(actorSystem, master, serializer, 500)
store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
......@@ -345,49 +413,49 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
test("block compression") {
try {
System.setProperty("spark.shuffle.compress", "true")
store = new BlockManager(master, serializer, 2000)
store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.shuffle.compress", "false")
store = new BlockManager(master, serializer, 2000)
store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "true")
store = new BlockManager(master, serializer, 2000)
store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.broadcast.compress", "false")
store = new BlockManager(master, serializer, 2000)
store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "true")
store = new BlockManager(master, serializer, 2000)
store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
store.stop()
store = null
System.setProperty("spark.rdd.compress", "false")
store = new BlockManager(master, serializer, 2000)
store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
store = new BlockManager(master, serializer, 2000)
store = new BlockManager(actorSystem, master, serializer, 2000)
store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
store.stop()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment